Compare commits

...

4 commits

Author SHA1 Message Date
mwiegand
bbb2b983bc
harden(l4d2web): per-username login rate limit alongside per-IP
A 20-attempts-per-60s budget keyed by IP doesn't slow a distributed brute force that rotates source IPs. Add a parallel per-username bucket with the same threshold so a single account can't burn through more than 20 failed logins/min regardless of where they come from. Empty usernames aren't bucketed (would DoS the anonymous 401 path). Successful login clears both buckets.
2026-05-14 22:26:20 +02:00
mwiegand
0e2a78e065
secure(l4d2web): block non-admin writes on system overlays; last-admin guard on deactivate
_load_files_overlay docs already promised "owner or admin" for mutations, but the check only filtered by overlay.type — system overlays (user_id IS NULL) were writable by any logged-in user. Add the explicit 403 for non-admins; read-only routes remain open across all overlay types.

Mirror the delete-route last-admin guard on /admin/users/<id>/deactivate so a future auth-model change (service accounts bypassing require_admin, etc.) can't accidentally lock out the system.
2026-05-14 22:24:19 +02:00
mwiegand
74b7f61437
harden(l4d2web): default security response headers and generic error handlers
- after_request hook sets X-Content-Type-Options=nosniff, X-Frame-Options=DENY, Referrer-Policy=strict-origin-when-cross-origin, and a strict CSP (default-src 'self', script-src self+nonce, frame-ancestors 'none', form-action 'self'); HSTS added on secure non-test responses
- per-request CSP nonce minted in g.csp_nonce; servers.html's inline showModal script picks it up
- 404 and 500 handlers return short plain-text responses so a misbehaving deployment can't leak tracebacks via Werkzeug's debug page
2026-05-14 22:21:36 +02:00
mwiegand
2902c9cc82
harden(l4d2web): auth/session — clear on login+logout, constant-time CSRF, role-change invalidation
- login_user clears any pre-login session state before stamping user_id/pw_changed_at/admin so a fixated cookie value cannot smuggle data past the login boundary
- logout_user now session.clear()s instead of only popping user_id, removing leftover pw_changed_at/admin markers
- CSRF token comparison uses hmac.compare_digest
- load_current_user rejects sessions where the stamped admin flag no longer matches the user row, preventing a demoted admin from retaining elevated access until next password change (backward-compatible: sessions issued pre-upgrade lack the marker and pass through until next login)
2026-05-14 22:18:46 +02:00
10 changed files with 362 additions and 8 deletions

View file

@ -1,8 +1,9 @@
import hmac
import os
import secrets
import click
from flask import Flask, Response, jsonify, redirect, request, session
from flask import Flask, Response, g, jsonify, redirect, request, session
from l4d2web.auth import current_user, load_current_user
from l4d2web.cli import register_cli
@ -72,10 +73,49 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
return None
token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token")
if token != session.get("csrf_token"):
expected = session.get("csrf_token")
if not token or not expected or not hmac.compare_digest(token, expected):
return Response("csrf token missing or invalid", status=400)
return None
@app.before_request
def assign_csp_nonce() -> None:
g.csp_nonce = secrets.token_urlsafe(16)
@app.after_request
def set_security_headers(response: Response) -> Response:
response.headers.setdefault("X-Content-Type-Options", "nosniff")
response.headers.setdefault("X-Frame-Options", "DENY")
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
nonce = getattr(g, "csp_nonce", "")
# script-src nonce blocks inline XSS; style 'unsafe-inline' kept for
# htmx's auto-injected indicator styles. img/data: for SVG icons.
response.headers.setdefault(
"Content-Security-Policy",
"default-src 'self'; "
f"script-src 'self' 'nonce-{nonce}'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'",
)
if not app.config.get("TESTING") and request.is_secure:
response.headers.setdefault(
"Strict-Transport-Security",
"max-age=31536000; includeSubDomains",
)
return response
@app.errorhandler(404)
def not_found(_err):
return Response("not found", status=404, mimetype="text/plain")
@app.errorhandler(500)
def internal_error(_err):
return Response("internal server error", status=500, mimetype="text/plain")
app.before_request(load_current_user)
app.register_blueprint(auth_bp)
app.register_blueprint(overlay_bp)

View file

@ -62,6 +62,15 @@ def load_current_user() -> None:
g.user = None
return
# Role change since login → reject. Prevents a demoted admin from
# keeping admin until their next password change. Absence of the marker
# means a pre-upgrade session; let it through and the next login will
# stamp it.
session_admin = session.get("admin")
if session_admin is not None and bool(session_admin) != bool(user.admin):
g.user = None
return
g.user = user
@ -69,13 +78,18 @@ def current_user() -> User | None:
return getattr(g, "user", None)
def login_user(user_id: int, password_changed_at) -> None:
def login_user(user_id: int, password_changed_at, admin: bool) -> None:
# Drop any pre-login session state so a fixated cookie value can't smuggle
# data past the login boundary. The next request's csrf_protect hook will
# mint a fresh csrf_token.
session.clear()
session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
session["admin"] = bool(admin)
def logout_user() -> None:
session.pop("user_id", None)
session.clear()
def is_safe_next(target: str | None) -> bool:

View file

@ -12,10 +12,12 @@ _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
LOGIN_ATTEMPTS_BY_USERNAME: dict[str, list[float]] = {}
def reset_login_rate_limits() -> None:
LOGIN_ATTEMPTS_BY_IP.clear()
LOGIN_ATTEMPTS_BY_USERNAME.clear()
def is_login_rate_limited(remote_addr: str) -> bool:
@ -27,6 +29,19 @@ def is_login_rate_limited(remote_addr: str) -> bool:
)
def is_username_rate_limited(username: str) -> bool:
# Empty usernames aren't bucketed — every anonymous probe would share
# one bucket and DoS the empty-username 401 path for everyone.
if not username:
return False
return check_rate_limit(
LOGIN_ATTEMPTS_BY_USERNAME,
username,
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
)
@bp.get("/login")
def login_form() -> str:
next_target = request.args.get("next", "")
@ -41,6 +56,8 @@ def login() -> Response:
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if is_username_rate_limited(username):
return Response("too many login attempts", status=429)
with session_scope() as db:
user = db.scalar(select(User).where(User.username == username))
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
@ -49,8 +66,9 @@ def login() -> Response:
# Same generic response for missing user, wrong password, or
# deactivated account — no timing oracle for deactivation status.
return Response("invalid credentials", status=401)
login_user(user.id, user.password_changed_at)
login_user(user.id, user.password_changed_at, user.admin)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
LOGIN_ATTEMPTS_BY_USERNAME.pop(username, None)
next_target = request.form.get("next", "")
return redirect(next_target if is_safe_next(next_target) else "/dashboard")

View file

@ -88,13 +88,16 @@ def _load_overlay_for_user(overlay_id: int, user) -> Overlay | Response:
def _load_files_overlay(overlay_id: int, user) -> Overlay | Response:
"""Like `_load_overlay_for_user`, but additionally 404s for any overlay
whose type isn't `files`. Mutating endpoints use this; read-only ones
keep working across all types."""
whose type isn't `files`, and 403s when a non-admin tries to mutate a
system overlay (`user_id IS NULL`). Read-only endpoints keep working
across all overlay types for everyone."""
result = _load_overlay_for_user(overlay_id, user)
if isinstance(result, Response):
return result
if result.type != "files":
return Response(status=404)
if result.user_id is None and not user.admin:
return Response(status=403)
return result

View file

@ -69,6 +69,19 @@ def admin_users_deactivate(user_id: int) -> Response:
target = db.scalar(select(User).where(User.id == user_id))
if target is None:
return Response(status=404)
# Defense-in-depth: mirror the last-admin guard from delete. Through
# normal flow this is unreachable (self-deactivate refused above), but
# it survives future auth-model changes like service accounts.
if target.admin and target.active:
other_active_admins = db.scalar(
select(func.count(User.id)).where(
User.admin.is_(True),
User.active.is_(True),
User.id != user_id,
)
) or 0
if other_active_admins == 0:
return Response("cannot deactivate the last active admin", status=409)
target.active = False
return redirect("/admin/users")

View file

@ -73,7 +73,7 @@
{% endif %}
{% if prefill_blueprint_id %}
<script>
<script nonce="{{ g.csp_nonce }}">
document.addEventListener("DOMContentLoaded", () => {
const dialog = document.getElementById("create-server-modal");
if (dialog && typeof dialog.showModal === "function") {

View file

@ -78,6 +78,21 @@ def test_deactivate_flips_active_false(admin_client):
assert _user_active(target) is False
def test_deactivate_other_admin_succeeds_when_more_than_one_admin(admin_client):
"""Defense-in-depth mirror of the delete-admin test. The fixture creates
two admins; admin can deactivate admin2 because at least one admin (the
actor) remains."""
client, _ = admin_client
with session_scope() as db:
admin2 = db.scalar(select(User).where(User.username == "admin2"))
admin2_id = admin2.id
response = _post(client, f"/admin/users/{admin2_id}/deactivate")
assert response.status_code == 302
assert _user_active(admin2_id) is False
def test_activate_flips_active_true(admin_client):
client, _ = admin_client
target = _add_user("bob", active=False)

View file

@ -1,4 +1,5 @@
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -199,6 +200,79 @@ def test_login_stamps_password_changed_at_in_session(client) -> None:
assert marker == user.password_changed_at.isoformat()
def test_login_clears_pre_login_session_state(client) -> None:
"""Pre-login session keys must not survive into the authenticated session,
so a fixated cookie value cannot smuggle state past the login boundary."""
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
with client.session_transaction() as sess:
sess["pre_login_marker"] = "smuggled"
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
assert sess.get("user_id") is not None
assert "pre_login_marker" not in sess
def test_logout_clears_session_completely(client) -> None:
"""logout_user must drop pw_changed_at alongside user_id so no auth
markers linger after sign-out."""
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
client.post("/login", data={"username": "alice", "password": "secret"})
with client.session_transaction() as sess:
assert sess.get("user_id") is not None
assert sess.get("pw_changed_at") is not None
sess["csrf_token"] = "test-token"
client.post("/logout", data={"csrf_token": "test-token"})
with client.session_transaction() as sess:
assert "user_id" not in sess
assert "pw_changed_at" not in sess
def test_load_current_user_rejects_role_change(client) -> None:
"""If a user's admin flag changes after login, the existing session must
be rejected on the next request preventing demoted admins from
retaining elevated access until their next password change."""
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"), admin=True)
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
sess["admin"] = True
# Demote outside the session.
with session_scope() as db:
target = db.scalar(select(User).where(User.id == uid))
target.admin = False
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_login_stamps_admin_flag_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret"), admin=True))
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
assert sess.get("admin") is True
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)

View file

@ -64,6 +64,70 @@ def _make_overlay(left4me_root: Path, *, user_id: int | None, name: str) -> int:
return overlay_id
def _make_files_overlay(left4me_root: Path, *, user_id: int | None, name: str) -> int:
with session_scope() as s:
overlay = Overlay(name=name, path="", type="files", user_id=user_id)
s.add(overlay)
s.flush()
overlay.path = str(overlay.id)
overlay_id = overlay.id
(left4me_root / "overlays" / str(overlay_id)).mkdir(parents=True)
return overlay_id
def test_non_admin_cannot_mkdir_in_system_overlay(app, left4me_root: Path) -> None:
"""System overlays (user_id IS NULL) are readable by everyone but only
writable by admins. A regular user attempting to mutate gets 403."""
user_id = _make_user(username="bob", admin=False)
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
client = _client_for(app, user_id)
response = client.post(
f"/overlays/{overlay_id}/files/mkdir",
json={"path": "newdir"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
def test_non_admin_can_still_read_system_overlay(app, left4me_root: Path) -> None:
user_id = _make_user(username="bob", admin=False)
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
(left4me_root / "overlays" / str(overlay_id) / "config.cfg").write_text("ok")
client = _client_for(app, user_id)
response = client.get(f"/overlays/{overlay_id}/files")
assert response.status_code == 200
assert b"config.cfg" in response.data
def test_admin_can_mkdir_in_system_overlay(app, left4me_root: Path) -> None:
admin_id = _make_user(username="admin", admin=True)
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
client = _client_for(app, admin_id)
response = client.post(
f"/overlays/{overlay_id}/files/mkdir",
json={"path": "newdir"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code in (200, 201, 204)
assert (left4me_root / "overlays" / str(overlay_id) / "newdir").is_dir()
def test_owner_can_mkdir_in_own_overlay(app, left4me_root: Path) -> None:
user_id = _make_user(username="bob", admin=False)
overlay_id = _make_files_overlay(left4me_root, user_id=user_id, name="mine")
client = _client_for(app, user_id)
response = client.post(
f"/overlays/{overlay_id}/files/mkdir",
json={"path": "newdir"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code in (200, 201, 204)
def test_files_fragment_lists_root_directory(app, left4me_root: Path) -> None:
user_id = _make_user()
overlay_id = _make_overlay(left4me_root, user_id=user_id, name="my")

View file

@ -28,3 +28,116 @@ def test_login_rate_limit(client) -> None:
response = client.post("/login", data={"username": "x", "password": "y"})
assert response.status_code == 429
def test_login_rate_limit_per_username_across_ips(client) -> None:
"""A distributed brute-force that rotates the source IP must still be
slowed once the per-username budget for a single account is exhausted."""
for i in range(20):
client.post(
"/login",
data={"username": "alice", "password": "wrong"},
environ_overrides={"REMOTE_ADDR": f"10.0.0.{i}"},
)
response = client.post(
"/login",
data={"username": "alice", "password": "wrong"},
environ_overrides={"REMOTE_ADDR": "10.99.99.99"},
)
assert response.status_code == 429
def test_login_rate_limit_empty_username_does_not_bucket(client) -> None:
"""Anonymous probes with no username must not all share one bucket — that
would let an attacker DoS legitimate empty-username 401s. We just don't
track them per-username."""
for i in range(25):
client.post(
"/login",
data={"username": "", "password": "wrong"},
environ_overrides={"REMOTE_ADDR": f"10.1.0.{i}"},
)
# A real username from a fresh IP must still get through to the
# 401 path (i.e. not be 429ed by empty-username spillover).
response = client.post(
"/login",
data={"username": "alice", "password": "wrong"},
environ_overrides={"REMOTE_ADDR": "10.2.0.1"},
)
assert response.status_code == 401
def test_security_headers_present(client) -> None:
response = client.get("/login")
assert response.headers.get("X-Content-Type-Options") == "nosniff"
assert response.headers.get("X-Frame-Options") == "DENY"
assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin"
csp = response.headers.get("Content-Security-Policy")
assert csp is not None
assert "default-src 'self'" in csp
assert "frame-ancestors 'none'" in csp
assert "form-action 'self'" in csp
def test_csp_nonce_matches_inline_script(client) -> None:
"""Pages that emit inline <script> blocks must tag them with the
per-request CSP nonce, otherwise the browser blocks them."""
import re
with client.session_transaction() as sess:
sess["csrf_token"] = "tok"
# Log in so we can hit /servers.
with session_scope() as db:
from sqlalchemy import select
alice = db.scalar(select(User).where(User.username == "alice"))
alice_id = alice.id
marker = alice.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = alice_id
sess["pw_changed_at"] = marker
sess["admin"] = False
response = client.get("/servers?prefill_blueprint_id=1")
csp = response.headers.get("Content-Security-Policy", "")
nonce_match = re.search(r"'nonce-([^']+)'", csp)
assert nonce_match, f"expected nonce in CSP, got: {csp}"
nonce = nonce_match.group(1)
body = response.get_data(as_text=True)
if "<script>" in body:
# An un-nonced inline script slipped through.
pytest.fail("inline <script> without nonce attribute")
if "prefill_blueprint_id" in body and "showModal" in body:
assert f'nonce="{nonce}"' in body
def test_404_response_is_generic(client) -> None:
response = client.get("/this-route-does-not-exist")
assert response.status_code == 404
body = response.get_data(as_text=True)
assert "Traceback" not in body
assert "werkzeug" not in body.lower()
def test_500_response_does_not_leak_traceback(tmp_path, monkeypatch) -> None:
from l4d2web.app import create_app
db_url = f"sqlite:///{tmp_path/'boom.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
boom_app = create_app({
"TESTING": True,
"DATABASE_URL": db_url,
"SECRET_KEY": "test",
"PROPAGATE_EXCEPTIONS": False,
})
@boom_app.route("/__boom__")
def boom():
raise RuntimeError("internal detail that must not leak")
response = boom_app.test_client().get("/__boom__")
assert response.status_code == 500
body = response.get_data(as_text=True)
assert "internal detail" not in body
assert "Traceback" not in body