security: harden boundary inputs and production defaults

- validate instance names at the host lib and web boundary against
  [a-z0-9][a-z0-9_-]{0,63} to prevent path traversal via Server.name
- fail-closed on SECRET_KEY: load_config returns None when env unset,
  create_app raises if missing or "dev" outside TESTING
- close login timing oracle by hashing a dummy digest when the user
  is not found, equalizing response time
- set SESSION_COOKIE_SECURE outside TESTING
- delete_instance tolerates stop_service and fusermount3 failures so
  partially-initialized instances clean up without contract breaks;
  drops the is_mount() preflight that violated AGENTS.md
- document claim_next_job's single-process assumption
- clarify emit_step contract via docstring

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-07 00:53:33 +02:00
parent 3809f85795
commit f81e839ba2
No known key found for this signature in database
15 changed files with 258 additions and 24 deletions

View file

@ -1,8 +1,9 @@
from pathlib import Path
import shutil
import subprocess
from typing import Callable
from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path
from l4d2host.paths import DEFAULT_LEFT4ME_ROOT, get_left4me_root, overlay_path, validate_instance_name
from l4d2host.process import run_command
from l4d2host.service_control import start_service, stop_service
from l4d2host.spec import load_spec
@ -24,6 +25,7 @@ def initialize_instance(
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
spec = load_spec(spec_path)
@ -73,6 +75,7 @@ def start_instance(
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
@ -122,6 +125,7 @@ def stop_instance(
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
emit_step("stopping systemd service...", on_stdout, passthrough)
stop_service(
@ -151,6 +155,7 @@ def delete_instance(
passthrough: bool = False,
should_cancel: Callable[[], bool] | None = None,
) -> None:
name = validate_instance_name(name)
root = get_left4me_root() if root is None else Path(root)
instance_dir = root / "instances" / name
runtime_dir = root / "runtime" / name
@ -159,6 +164,7 @@ def delete_instance(
return
emit_step("stopping systemd service (if running)...", on_stdout, passthrough)
try:
stop_service(
name,
on_stdout=on_stdout,
@ -166,17 +172,20 @@ def delete_instance(
passthrough=passthrough,
should_cancel=should_cancel,
)
except subprocess.CalledProcessError:
pass
merged = runtime_dir / "merged"
if merged.is_mount():
emit_step("unmounting runtime overlay (if mounted)...", on_stdout, passthrough)
try:
run_command(
["fusermount3", "-u", str(merged)],
["fusermount3", "-u", str(runtime_dir / "merged")],
on_stdout=on_stdout,
on_stderr=on_stderr,
passthrough=passthrough,
should_cancel=should_cancel,
)
except subprocess.CalledProcessError:
pass
emit_step("removing instance files...", on_stdout, passthrough)
if instance_dir.exists():

View file

@ -1,6 +1,7 @@
from typing import Callable
def emit_step(msg: str, on_stdout: Callable[[str], None] | None, passthrough: bool) -> None:
"""Emit a `Step: …` line to both channels independently when both are set."""
formatted = f"Step: {msg}"
if on_stdout is not None:
on_stdout(formatted)

View file

@ -1,7 +1,11 @@
import os
import re
from pathlib import Path
_INSTANCE_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
DEFAULT_LEFT4ME_ROOT = Path("/var/lib/left4me")
@ -20,6 +24,15 @@ def get_left4me_root() -> Path:
return path
def validate_instance_name(name: str) -> str:
if not _INSTANCE_NAME_RE.fullmatch(name):
raise ValueError(
"instance name must match [a-z0-9][a-z0-9_-]{0,63} "
"(lowercase, no path separators, no whitespace)"
)
return name
def validate_overlay_ref(ref: str) -> str:
stripped = ref.strip()
if stripped != ref:

View file

@ -1,8 +1,14 @@
import subprocess
from pathlib import Path
import pytest
from l4d2host.instances import delete_instance, start_instance
from l4d2host.instances import (
delete_instance,
initialize_instance,
start_instance,
stop_instance,
)
def test_start_order(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
@ -34,7 +40,43 @@ def test_delete_missing_is_noop(tmp_path: Path) -> None:
delete_instance("missing", root=tmp_path)
def test_delete_stopped_instance_removes_dirs_without_unmounting(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
def test_delete_succeeds_when_stop_service_fails(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[list[str]] = []
def fake_run_command(cmd, **kwargs):
del kwargs
calls.append(list(cmd))
if cmd[:2] == ["sudo", "-n"] and "left4me-systemctl" in cmd[2] and "stop" in cmd:
raise subprocess.CalledProcessError(
returncode=5,
cmd=list(cmd),
stderr="Unit left4me-server@alpha.service not loaded.",
)
(tmp_path / "instances" / "alpha").mkdir(parents=True)
(tmp_path / "runtime" / "alpha" / "merged").mkdir(parents=True)
monkeypatch.setattr("l4d2host.instances.run_command", fake_run_command)
monkeypatch.setattr("l4d2host.service_control.run_command", fake_run_command)
delete_instance("alpha", root=tmp_path)
assert not (tmp_path / "instances" / "alpha").exists()
assert not (tmp_path / "runtime" / "alpha").exists()
@pytest.mark.parametrize("bad_name", ["..", "../escape", "foo/bar", " foo", "Foo"])
def test_lifecycle_rejects_unsafe_instance_names(tmp_path: Path, bad_name: str) -> None:
for func in (start_instance, stop_instance, delete_instance):
with pytest.raises(ValueError):
func(bad_name, root=tmp_path)
with pytest.raises(ValueError):
initialize_instance(bad_name, tmp_path / "spec.yaml", root=tmp_path)
assert not (tmp_path / "instances").exists()
assert not (tmp_path / "runtime").exists()
def test_delete_stopped_instance_removes_dirs(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[list[str]] = []
def fake_run_command(cmd, **kwargs):
@ -52,4 +94,29 @@ def test_delete_stopped_instance_removes_dirs_without_unmounting(tmp_path: Path,
assert not (tmp_path / "instances" / "alpha").exists()
assert not (tmp_path / "runtime" / "alpha").exists()
assert ["sudo", "-n", "/usr/local/libexec/left4me/left4me-systemctl", "stop", "alpha"] in calls
assert not any(call[0] == "fusermount3" for call in calls)
def test_delete_succeeds_when_unmount_fails(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
fusermount_calls: list[list[str]] = []
def fake_run_command(cmd, **kwargs):
del kwargs
if cmd and cmd[0] == "fusermount3":
fusermount_calls.append(list(cmd))
raise subprocess.CalledProcessError(
returncode=1,
cmd=list(cmd),
stderr="fusermount3: entry for merged not found in /etc/mtab",
)
(tmp_path / "instances" / "alpha").mkdir(parents=True)
(tmp_path / "runtime" / "alpha" / "merged").mkdir(parents=True)
monkeypatch.setattr("l4d2host.instances.run_command", fake_run_command)
monkeypatch.setattr("l4d2host.service_control.run_command", fake_run_command)
delete_instance("alpha", root=tmp_path)
assert fusermount_calls, "delete must always attempt fusermount3 -u (no preflight)"
assert not (tmp_path / "instances" / "alpha").exists()
assert not (tmp_path / "runtime" / "alpha").exists()

View file

@ -2,7 +2,12 @@ from pathlib import Path
import pytest
from l4d2host.paths import get_left4me_root, overlay_path, validate_overlay_ref
from l4d2host.paths import (
get_left4me_root,
overlay_path,
validate_instance_name,
validate_overlay_ref,
)
def test_get_left4me_root_defaults_to_var_lib_left4me(monkeypatch: pytest.MonkeyPatch) -> None:
@ -46,6 +51,38 @@ def test_validate_overlay_ref_rejects_unsafe_refs(ref: str) -> None:
validate_overlay_ref(ref)
@pytest.mark.parametrize("name", ["srv1", "alpha", "my-server", "srv_01", "a", "a" * 64])
def test_validate_instance_name_accepts_safe_names(name: str) -> None:
assert validate_instance_name(name) == name
@pytest.mark.parametrize(
"name",
[
"",
" ",
"..",
".",
"../foo",
"foo/..",
"foo/bar",
"foo\\bar",
" foo",
"foo ",
"Foo",
"foo bar",
"foo$",
"-foo",
"_foo",
"foo@bar",
"a" * 65,
],
)
def test_validate_instance_name_rejects_unsafe_names(name: str) -> None:
with pytest.raises(ValueError):
validate_instance_name(name)
def test_overlay_path_resolves_under_root(tmp_path: Path) -> None:
assert overlay_path("standard", root=tmp_path) == tmp_path / "overlays" / "standard"

View file

@ -34,6 +34,12 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
if test_config is not None:
app.config.update(test_config)
secret_key = app.config.get("SECRET_KEY")
if not app.config.get("TESTING") and (not secret_key or secret_key == "dev"):
raise RuntimeError("SECRET_KEY must be set to a non-default value outside of testing")
app.config["SESSION_COOKIE_SECURE"] = not app.config.get("TESTING", False)
with app.app_context():
init_db()

View file

@ -2,7 +2,7 @@ import os
DEFAULT_CONFIG: dict[str, object] = {
"SECRET_KEY": "dev",
"SECRET_KEY": None,
"DATABASE_URL": "sqlite:///l4d2web.db",
"STATUS_REFRESH_SECONDS": 8,
"JOB_WORKER_THREADS": 4,
@ -19,7 +19,7 @@ def _bool_from_env(raw: str) -> bool:
def load_config() -> dict[str, object]:
return {
"SECRET_KEY": os.getenv("SECRET_KEY", "dev"),
"SECRET_KEY": os.getenv("SECRET_KEY"),
"DATABASE_URL": os.getenv("DATABASE_URL", "sqlite:///l4d2web.db"),
"STATUS_REFRESH_SECONDS": int(os.getenv("STATUS_REFRESH_SECONDS", "8")),
"JOB_WORKER_THREADS": int(os.getenv("JOB_WORKER_THREADS", "4")),

View file

@ -3,12 +3,13 @@ import time
from flask import Blueprint, Response, redirect, render_template, request
from sqlalchemy import select
from l4d2web.auth import is_safe_next, login_user, logout_user, verify_password
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
from l4d2web.db import session_scope
from l4d2web.models import User
bp = Blueprint("auth", __name__)
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
@ -45,7 +46,9 @@ def login() -> Response:
password = request.form.get("password", "")
with session_scope() as db:
user = db.scalar(select(User).where(User.username == username))
if user is None or not verify_password(password, user.password_digest):
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
password_ok = verify_password(password, digest)
if user is None or not password_ok:
return Response("invalid credentials", status=401)
login_user(user.id)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)

View file

@ -6,6 +6,7 @@ from l4d2web.auth import current_user, require_login
from l4d2web.db import session_scope
from l4d2web.models import Blueprint as BlueprintModel
from l4d2web.models import Job, Server
from l4d2web.services.security import validate_instance_name
bp = Blueprint("server", __name__)
@ -19,6 +20,11 @@ def create_server() -> Response:
json_response = request.is_json
payload = request.get_json(silent=True) if json_response else request.form
try:
name = validate_instance_name(str(payload["name"]))
except (KeyError, TypeError, ValueError):
return Response("invalid server name", status=400)
with session_scope() as db:
blueprint = db.scalar(
select(BlueprintModel).where(
@ -32,7 +38,7 @@ def create_server() -> Response:
server = Server(
user_id=user.id,
blueprint_id=blueprint.id,
name=str(payload["name"]),
name=name,
port=int(payload["port"]),
desired_state="stopped",
actual_state="unknown",

View file

@ -52,6 +52,13 @@ def build_scheduler_state(session: Session) -> SchedulerState:
def claim_next_job() -> int | None:
"""Atomically claim the next runnable job.
`_claim_lock` is process-local, so correctness depends on a single web
process. The deployed unit pins `gunicorn --workers 1`; if that ever
changes, replace this with a row-level lock (`with_for_update`) to keep
job claims unique across processes.
"""
with _claim_lock:
with session_scope() as db:
state = build_scheduler_state(db)

View file

@ -1,3 +1,17 @@
import re
_INSTANCE_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
def validate_instance_name(raw: str) -> str:
if not _INSTANCE_NAME_RE.fullmatch(raw):
raise ValueError(
"instance name must match [a-z0-9][a-z0-9_-]{0,63}"
)
return raw
def validate_overlay_ref(raw: str) -> str:
if raw != raw.strip():
raise ValueError("overlay ref must not have leading or trailing whitespace")

View file

@ -50,6 +50,22 @@ def test_login_page_drops_unsafe_encoded_next(client) -> None:
assert "evil.com" not in text
def test_login_calls_verify_password_for_unknown_user(client, monkeypatch) -> None:
calls: list[tuple[str, str]] = []
def tracking_verify(raw: str, digest: str) -> bool:
calls.append((raw, digest))
return False
monkeypatch.setattr("l4d2web.routes.auth_routes.verify_password", tracking_verify)
response = client.post("/login", data={"username": "ghost", "password": "guess"})
assert response.status_code == 401
assert len(calls) == 1, "verify_password must run on unknown users to equalize timing"
assert calls[0][0] == "guess"
def test_signup_routes_are_gone(client) -> None:
assert client.get("/signup").status_code == 404
assert client.post("/signup", data={"username": "alice", "password": "secret"}).status_code == 404

View file

@ -1,4 +1,5 @@
import click
import pytest
from l4d2web.app import create_app
from l4d2web.config import load_config
@ -25,6 +26,42 @@ def test_create_app_does_not_overwrite_database_url_env(tmp_path, monkeypatch) -
assert load_config()["DATABASE_URL"] == "sqlite:///env.db"
def test_create_app_raises_when_secret_key_missing(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'nokey.db'}"
monkeypatch.delenv("SECRET_KEY", raising=False)
with pytest.raises(RuntimeError):
create_app({"TESTING": False, "DATABASE_URL": db_url})
def test_create_app_raises_when_secret_key_is_dev(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'devkey.db'}"
monkeypatch.setenv("SECRET_KEY", "dev")
with pytest.raises(RuntimeError):
create_app({"TESTING": False, "DATABASE_URL": db_url})
def test_session_cookie_secure_in_production(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'cookie.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setattr("l4d2web.app.recover_stale_jobs", lambda: None)
monkeypatch.setattr("l4d2web.app.start_job_workers", lambda app: None)
app = create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "real-secret"})
assert app.config["SESSION_COOKIE_SECURE"] is True
def test_session_cookie_secure_disabled_in_testing(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'cookie-test.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
assert app.config["SESSION_COOKIE_SECURE"] is False
def test_create_app_skips_job_workers_in_cli_context(tmp_path, monkeypatch) -> None:
calls = []
db_url = f"sqlite:///{tmp_path/'cli.db'}"

View file

@ -2,7 +2,7 @@ from l4d2web.app import create_app
def test_health_endpoint() -> None:
app = create_app({"TESTING": True})
app = create_app({"TESTING": True, "SECRET_KEY": "test"})
client = app.test_client()
response = client.get("/health")

View file

@ -114,6 +114,24 @@ def test_create_server_duplicate_port(user_client_with_blueprints) -> None:
assert servers[0].name == "server-1"
@pytest.mark.parametrize("bad_name", ["..", "../escape", "foo/bar", " foo", "Foo"])
def test_create_server_rejects_unsafe_names(user_client_with_blueprints, bad_name: str) -> None:
client, data = user_client_with_blueprints
response = client.post(
"/servers",
data={"name": bad_name, "port": "27015", "blueprint_id": str(data["blueprint_id"])},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 400
from sqlalchemy import select
from l4d2web.models import Server
with session_scope() as session:
assert session.scalars(select(Server)).all() == []
def test_lifecycle_form_creates_queued_job(user_client_with_blueprints) -> None:
client, data = user_client_with_blueprints
create_response = client.post(