Compare commits
No commits in common. "bbb2b983bc899a348143f5ea7cf91db7d5070bff" and "66d14feca5b20c4fc7fea2bb075f41556ebb057b" have entirely different histories.
bbb2b983bc
...
66d14feca5
10 changed files with 8 additions and 362 deletions
|
|
@ -1,9 +1,8 @@
|
||||||
import hmac
|
|
||||||
import os
|
import os
|
||||||
import secrets
|
import secrets
|
||||||
|
|
||||||
import click
|
import click
|
||||||
from flask import Flask, Response, g, jsonify, redirect, request, session
|
from flask import Flask, Response, jsonify, redirect, request, session
|
||||||
|
|
||||||
from l4d2web.auth import current_user, load_current_user
|
from l4d2web.auth import current_user, load_current_user
|
||||||
from l4d2web.cli import register_cli
|
from l4d2web.cli import register_cli
|
||||||
|
|
@ -73,49 +72,10 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token")
|
token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token")
|
||||||
expected = session.get("csrf_token")
|
if token != session.get("csrf_token"):
|
||||||
if not token or not expected or not hmac.compare_digest(token, expected):
|
|
||||||
return Response("csrf token missing or invalid", status=400)
|
return Response("csrf token missing or invalid", status=400)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@app.before_request
|
|
||||||
def assign_csp_nonce() -> None:
|
|
||||||
g.csp_nonce = secrets.token_urlsafe(16)
|
|
||||||
|
|
||||||
@app.after_request
|
|
||||||
def set_security_headers(response: Response) -> Response:
|
|
||||||
response.headers.setdefault("X-Content-Type-Options", "nosniff")
|
|
||||||
response.headers.setdefault("X-Frame-Options", "DENY")
|
|
||||||
response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
|
|
||||||
nonce = getattr(g, "csp_nonce", "")
|
|
||||||
# script-src nonce blocks inline XSS; style 'unsafe-inline' kept for
|
|
||||||
# htmx's auto-injected indicator styles. img/data: for SVG icons.
|
|
||||||
response.headers.setdefault(
|
|
||||||
"Content-Security-Policy",
|
|
||||||
"default-src 'self'; "
|
|
||||||
f"script-src 'self' 'nonce-{nonce}'; "
|
|
||||||
"style-src 'self' 'unsafe-inline'; "
|
|
||||||
"img-src 'self' data:; "
|
|
||||||
"connect-src 'self'; "
|
|
||||||
"frame-ancestors 'none'; "
|
|
||||||
"base-uri 'self'; "
|
|
||||||
"form-action 'self'",
|
|
||||||
)
|
|
||||||
if not app.config.get("TESTING") and request.is_secure:
|
|
||||||
response.headers.setdefault(
|
|
||||||
"Strict-Transport-Security",
|
|
||||||
"max-age=31536000; includeSubDomains",
|
|
||||||
)
|
|
||||||
return response
|
|
||||||
|
|
||||||
@app.errorhandler(404)
|
|
||||||
def not_found(_err):
|
|
||||||
return Response("not found", status=404, mimetype="text/plain")
|
|
||||||
|
|
||||||
@app.errorhandler(500)
|
|
||||||
def internal_error(_err):
|
|
||||||
return Response("internal server error", status=500, mimetype="text/plain")
|
|
||||||
|
|
||||||
app.before_request(load_current_user)
|
app.before_request(load_current_user)
|
||||||
app.register_blueprint(auth_bp)
|
app.register_blueprint(auth_bp)
|
||||||
app.register_blueprint(overlay_bp)
|
app.register_blueprint(overlay_bp)
|
||||||
|
|
|
||||||
|
|
@ -62,15 +62,6 @@ def load_current_user() -> None:
|
||||||
g.user = None
|
g.user = None
|
||||||
return
|
return
|
||||||
|
|
||||||
# Role change since login → reject. Prevents a demoted admin from
|
|
||||||
# keeping admin until their next password change. Absence of the marker
|
|
||||||
# means a pre-upgrade session; let it through and the next login will
|
|
||||||
# stamp it.
|
|
||||||
session_admin = session.get("admin")
|
|
||||||
if session_admin is not None and bool(session_admin) != bool(user.admin):
|
|
||||||
g.user = None
|
|
||||||
return
|
|
||||||
|
|
||||||
g.user = user
|
g.user = user
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -78,18 +69,13 @@ def current_user() -> User | None:
|
||||||
return getattr(g, "user", None)
|
return getattr(g, "user", None)
|
||||||
|
|
||||||
|
|
||||||
def login_user(user_id: int, password_changed_at, admin: bool) -> None:
|
def login_user(user_id: int, password_changed_at) -> None:
|
||||||
# Drop any pre-login session state so a fixated cookie value can't smuggle
|
|
||||||
# data past the login boundary. The next request's csrf_protect hook will
|
|
||||||
# mint a fresh csrf_token.
|
|
||||||
session.clear()
|
|
||||||
session["user_id"] = user_id
|
session["user_id"] = user_id
|
||||||
session["pw_changed_at"] = password_changed_at.isoformat()
|
session["pw_changed_at"] = password_changed_at.isoformat()
|
||||||
session["admin"] = bool(admin)
|
|
||||||
|
|
||||||
|
|
||||||
def logout_user() -> None:
|
def logout_user() -> None:
|
||||||
session.clear()
|
session.pop("user_id", None)
|
||||||
|
|
||||||
|
|
||||||
def is_safe_next(target: str | None) -> bool:
|
def is_safe_next(target: str | None) -> bool:
|
||||||
|
|
|
||||||
|
|
@ -12,12 +12,10 @@ _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
||||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||||
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
||||||
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||||
LOGIN_ATTEMPTS_BY_USERNAME: dict[str, list[float]] = {}
|
|
||||||
|
|
||||||
|
|
||||||
def reset_login_rate_limits() -> None:
|
def reset_login_rate_limits() -> None:
|
||||||
LOGIN_ATTEMPTS_BY_IP.clear()
|
LOGIN_ATTEMPTS_BY_IP.clear()
|
||||||
LOGIN_ATTEMPTS_BY_USERNAME.clear()
|
|
||||||
|
|
||||||
|
|
||||||
def is_login_rate_limited(remote_addr: str) -> bool:
|
def is_login_rate_limited(remote_addr: str) -> bool:
|
||||||
|
|
@ -29,19 +27,6 @@ def is_login_rate_limited(remote_addr: str) -> bool:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def is_username_rate_limited(username: str) -> bool:
|
|
||||||
# Empty usernames aren't bucketed — every anonymous probe would share
|
|
||||||
# one bucket and DoS the empty-username 401 path for everyone.
|
|
||||||
if not username:
|
|
||||||
return False
|
|
||||||
return check_rate_limit(
|
|
||||||
LOGIN_ATTEMPTS_BY_USERNAME,
|
|
||||||
username,
|
|
||||||
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
|
||||||
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@bp.get("/login")
|
@bp.get("/login")
|
||||||
def login_form() -> str:
|
def login_form() -> str:
|
||||||
next_target = request.args.get("next", "")
|
next_target = request.args.get("next", "")
|
||||||
|
|
@ -56,8 +41,6 @@ def login() -> Response:
|
||||||
|
|
||||||
username = request.form.get("username", "").strip()
|
username = request.form.get("username", "").strip()
|
||||||
password = request.form.get("password", "")
|
password = request.form.get("password", "")
|
||||||
if is_username_rate_limited(username):
|
|
||||||
return Response("too many login attempts", status=429)
|
|
||||||
with session_scope() as db:
|
with session_scope() as db:
|
||||||
user = db.scalar(select(User).where(User.username == username))
|
user = db.scalar(select(User).where(User.username == username))
|
||||||
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
|
digest = user.password_digest if user is not None else _TIMING_DUMMY_DIGEST
|
||||||
|
|
@ -66,9 +49,8 @@ def login() -> Response:
|
||||||
# Same generic response for missing user, wrong password, or
|
# Same generic response for missing user, wrong password, or
|
||||||
# deactivated account — no timing oracle for deactivation status.
|
# deactivated account — no timing oracle for deactivation status.
|
||||||
return Response("invalid credentials", status=401)
|
return Response("invalid credentials", status=401)
|
||||||
login_user(user.id, user.password_changed_at, user.admin)
|
login_user(user.id, user.password_changed_at)
|
||||||
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
||||||
LOGIN_ATTEMPTS_BY_USERNAME.pop(username, None)
|
|
||||||
next_target = request.form.get("next", "")
|
next_target = request.form.get("next", "")
|
||||||
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -88,16 +88,13 @@ def _load_overlay_for_user(overlay_id: int, user) -> Overlay | Response:
|
||||||
|
|
||||||
def _load_files_overlay(overlay_id: int, user) -> Overlay | Response:
|
def _load_files_overlay(overlay_id: int, user) -> Overlay | Response:
|
||||||
"""Like `_load_overlay_for_user`, but additionally 404s for any overlay
|
"""Like `_load_overlay_for_user`, but additionally 404s for any overlay
|
||||||
whose type isn't `files`, and 403s when a non-admin tries to mutate a
|
whose type isn't `files`. Mutating endpoints use this; read-only ones
|
||||||
system overlay (`user_id IS NULL`). Read-only endpoints keep working
|
keep working across all types."""
|
||||||
across all overlay types for everyone."""
|
|
||||||
result = _load_overlay_for_user(overlay_id, user)
|
result = _load_overlay_for_user(overlay_id, user)
|
||||||
if isinstance(result, Response):
|
if isinstance(result, Response):
|
||||||
return result
|
return result
|
||||||
if result.type != "files":
|
if result.type != "files":
|
||||||
return Response(status=404)
|
return Response(status=404)
|
||||||
if result.user_id is None and not user.admin:
|
|
||||||
return Response(status=403)
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -69,19 +69,6 @@ def admin_users_deactivate(user_id: int) -> Response:
|
||||||
target = db.scalar(select(User).where(User.id == user_id))
|
target = db.scalar(select(User).where(User.id == user_id))
|
||||||
if target is None:
|
if target is None:
|
||||||
return Response(status=404)
|
return Response(status=404)
|
||||||
# Defense-in-depth: mirror the last-admin guard from delete. Through
|
|
||||||
# normal flow this is unreachable (self-deactivate refused above), but
|
|
||||||
# it survives future auth-model changes like service accounts.
|
|
||||||
if target.admin and target.active:
|
|
||||||
other_active_admins = db.scalar(
|
|
||||||
select(func.count(User.id)).where(
|
|
||||||
User.admin.is_(True),
|
|
||||||
User.active.is_(True),
|
|
||||||
User.id != user_id,
|
|
||||||
)
|
|
||||||
) or 0
|
|
||||||
if other_active_admins == 0:
|
|
||||||
return Response("cannot deactivate the last active admin", status=409)
|
|
||||||
target.active = False
|
target.active = False
|
||||||
return redirect("/admin/users")
|
return redirect("/admin/users")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,7 @@
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
{% if prefill_blueprint_id %}
|
{% if prefill_blueprint_id %}
|
||||||
<script nonce="{{ g.csp_nonce }}">
|
<script>
|
||||||
document.addEventListener("DOMContentLoaded", () => {
|
document.addEventListener("DOMContentLoaded", () => {
|
||||||
const dialog = document.getElementById("create-server-modal");
|
const dialog = document.getElementById("create-server-modal");
|
||||||
if (dialog && typeof dialog.showModal === "function") {
|
if (dialog && typeof dialog.showModal === "function") {
|
||||||
|
|
|
||||||
|
|
@ -78,21 +78,6 @@ def test_deactivate_flips_active_false(admin_client):
|
||||||
assert _user_active(target) is False
|
assert _user_active(target) is False
|
||||||
|
|
||||||
|
|
||||||
def test_deactivate_other_admin_succeeds_when_more_than_one_admin(admin_client):
|
|
||||||
"""Defense-in-depth mirror of the delete-admin test. The fixture creates
|
|
||||||
two admins; admin can deactivate admin2 because at least one admin (the
|
|
||||||
actor) remains."""
|
|
||||||
client, _ = admin_client
|
|
||||||
with session_scope() as db:
|
|
||||||
admin2 = db.scalar(select(User).where(User.username == "admin2"))
|
|
||||||
admin2_id = admin2.id
|
|
||||||
|
|
||||||
response = _post(client, f"/admin/users/{admin2_id}/deactivate")
|
|
||||||
|
|
||||||
assert response.status_code == 302
|
|
||||||
assert _user_active(admin2_id) is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_activate_flips_active_true(admin_client):
|
def test_activate_flips_active_true(admin_client):
|
||||||
client, _ = admin_client
|
client, _ = admin_client
|
||||||
target = _add_user("bob", active=False)
|
target = _add_user("bob", active=False)
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
import pytest
|
import pytest
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from l4d2web.app import create_app
|
from l4d2web.app import create_app
|
||||||
from l4d2web.auth import hash_password
|
from l4d2web.auth import hash_password
|
||||||
|
|
@ -200,79 +199,6 @@ def test_login_stamps_password_changed_at_in_session(client) -> None:
|
||||||
assert marker == user.password_changed_at.isoformat()
|
assert marker == user.password_changed_at.isoformat()
|
||||||
|
|
||||||
|
|
||||||
def test_login_clears_pre_login_session_state(client) -> None:
|
|
||||||
"""Pre-login session keys must not survive into the authenticated session,
|
|
||||||
so a fixated cookie value cannot smuggle state past the login boundary."""
|
|
||||||
with session_scope() as session:
|
|
||||||
session.add(User(username="alice", password_digest=hash_password("secret")))
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
sess["pre_login_marker"] = "smuggled"
|
|
||||||
|
|
||||||
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
||||||
assert response.status_code == 302
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
assert sess.get("user_id") is not None
|
|
||||||
assert "pre_login_marker" not in sess
|
|
||||||
|
|
||||||
|
|
||||||
def test_logout_clears_session_completely(client) -> None:
|
|
||||||
"""logout_user must drop pw_changed_at alongside user_id so no auth
|
|
||||||
markers linger after sign-out."""
|
|
||||||
with session_scope() as session:
|
|
||||||
session.add(User(username="alice", password_digest=hash_password("secret")))
|
|
||||||
|
|
||||||
client.post("/login", data={"username": "alice", "password": "secret"})
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
assert sess.get("user_id") is not None
|
|
||||||
assert sess.get("pw_changed_at") is not None
|
|
||||||
sess["csrf_token"] = "test-token"
|
|
||||||
|
|
||||||
client.post("/logout", data={"csrf_token": "test-token"})
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
assert "user_id" not in sess
|
|
||||||
assert "pw_changed_at" not in sess
|
|
||||||
|
|
||||||
|
|
||||||
def test_load_current_user_rejects_role_change(client) -> None:
|
|
||||||
"""If a user's admin flag changes after login, the existing session must
|
|
||||||
be rejected on the next request — preventing demoted admins from
|
|
||||||
retaining elevated access until their next password change."""
|
|
||||||
with session_scope() as db:
|
|
||||||
u = User(username="alice", password_digest=hash_password("secret"), admin=True)
|
|
||||||
db.add(u)
|
|
||||||
db.flush()
|
|
||||||
uid = u.id
|
|
||||||
marker = u.password_changed_at.isoformat()
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
sess["user_id"] = uid
|
|
||||||
sess["pw_changed_at"] = marker
|
|
||||||
sess["admin"] = True
|
|
||||||
|
|
||||||
# Demote outside the session.
|
|
||||||
with session_scope() as db:
|
|
||||||
target = db.scalar(select(User).where(User.id == uid))
|
|
||||||
target.admin = False
|
|
||||||
|
|
||||||
response = client.get("/dashboard")
|
|
||||||
assert response.status_code == 302
|
|
||||||
assert "/login" in response.headers["Location"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_stamps_admin_flag_in_session(client) -> None:
|
|
||||||
with session_scope() as session:
|
|
||||||
session.add(User(username="alice", password_digest=hash_password("secret"), admin=True))
|
|
||||||
|
|
||||||
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
||||||
assert response.status_code == 302
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
assert sess.get("admin") is True
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
||||||
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
||||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
|
|
||||||
|
|
@ -64,70 +64,6 @@ def _make_overlay(left4me_root: Path, *, user_id: int | None, name: str) -> int:
|
||||||
return overlay_id
|
return overlay_id
|
||||||
|
|
||||||
|
|
||||||
def _make_files_overlay(left4me_root: Path, *, user_id: int | None, name: str) -> int:
|
|
||||||
with session_scope() as s:
|
|
||||||
overlay = Overlay(name=name, path="", type="files", user_id=user_id)
|
|
||||||
s.add(overlay)
|
|
||||||
s.flush()
|
|
||||||
overlay.path = str(overlay.id)
|
|
||||||
overlay_id = overlay.id
|
|
||||||
(left4me_root / "overlays" / str(overlay_id)).mkdir(parents=True)
|
|
||||||
return overlay_id
|
|
||||||
|
|
||||||
|
|
||||||
def test_non_admin_cannot_mkdir_in_system_overlay(app, left4me_root: Path) -> None:
|
|
||||||
"""System overlays (user_id IS NULL) are readable by everyone but only
|
|
||||||
writable by admins. A regular user attempting to mutate gets 403."""
|
|
||||||
user_id = _make_user(username="bob", admin=False)
|
|
||||||
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
|
|
||||||
|
|
||||||
client = _client_for(app, user_id)
|
|
||||||
response = client.post(
|
|
||||||
f"/overlays/{overlay_id}/files/mkdir",
|
|
||||||
json={"path": "newdir"},
|
|
||||||
headers={"X-CSRF-Token": "test-token"},
|
|
||||||
)
|
|
||||||
assert response.status_code == 403
|
|
||||||
|
|
||||||
|
|
||||||
def test_non_admin_can_still_read_system_overlay(app, left4me_root: Path) -> None:
|
|
||||||
user_id = _make_user(username="bob", admin=False)
|
|
||||||
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
|
|
||||||
(left4me_root / "overlays" / str(overlay_id) / "config.cfg").write_text("ok")
|
|
||||||
|
|
||||||
client = _client_for(app, user_id)
|
|
||||||
response = client.get(f"/overlays/{overlay_id}/files")
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert b"config.cfg" in response.data
|
|
||||||
|
|
||||||
|
|
||||||
def test_admin_can_mkdir_in_system_overlay(app, left4me_root: Path) -> None:
|
|
||||||
admin_id = _make_user(username="admin", admin=True)
|
|
||||||
overlay_id = _make_files_overlay(left4me_root, user_id=None, name="system")
|
|
||||||
|
|
||||||
client = _client_for(app, admin_id)
|
|
||||||
response = client.post(
|
|
||||||
f"/overlays/{overlay_id}/files/mkdir",
|
|
||||||
json={"path": "newdir"},
|
|
||||||
headers={"X-CSRF-Token": "test-token"},
|
|
||||||
)
|
|
||||||
assert response.status_code in (200, 201, 204)
|
|
||||||
assert (left4me_root / "overlays" / str(overlay_id) / "newdir").is_dir()
|
|
||||||
|
|
||||||
|
|
||||||
def test_owner_can_mkdir_in_own_overlay(app, left4me_root: Path) -> None:
|
|
||||||
user_id = _make_user(username="bob", admin=False)
|
|
||||||
overlay_id = _make_files_overlay(left4me_root, user_id=user_id, name="mine")
|
|
||||||
|
|
||||||
client = _client_for(app, user_id)
|
|
||||||
response = client.post(
|
|
||||||
f"/overlays/{overlay_id}/files/mkdir",
|
|
||||||
json={"path": "newdir"},
|
|
||||||
headers={"X-CSRF-Token": "test-token"},
|
|
||||||
)
|
|
||||||
assert response.status_code in (200, 201, 204)
|
|
||||||
|
|
||||||
|
|
||||||
def test_files_fragment_lists_root_directory(app, left4me_root: Path) -> None:
|
def test_files_fragment_lists_root_directory(app, left4me_root: Path) -> None:
|
||||||
user_id = _make_user()
|
user_id = _make_user()
|
||||||
overlay_id = _make_overlay(left4me_root, user_id=user_id, name="my")
|
overlay_id = _make_overlay(left4me_root, user_id=user_id, name="my")
|
||||||
|
|
|
||||||
|
|
@ -28,116 +28,3 @@ def test_login_rate_limit(client) -> None:
|
||||||
|
|
||||||
response = client.post("/login", data={"username": "x", "password": "y"})
|
response = client.post("/login", data={"username": "x", "password": "y"})
|
||||||
assert response.status_code == 429
|
assert response.status_code == 429
|
||||||
|
|
||||||
|
|
||||||
def test_login_rate_limit_per_username_across_ips(client) -> None:
|
|
||||||
"""A distributed brute-force that rotates the source IP must still be
|
|
||||||
slowed once the per-username budget for a single account is exhausted."""
|
|
||||||
for i in range(20):
|
|
||||||
client.post(
|
|
||||||
"/login",
|
|
||||||
data={"username": "alice", "password": "wrong"},
|
|
||||||
environ_overrides={"REMOTE_ADDR": f"10.0.0.{i}"},
|
|
||||||
)
|
|
||||||
|
|
||||||
response = client.post(
|
|
||||||
"/login",
|
|
||||||
data={"username": "alice", "password": "wrong"},
|
|
||||||
environ_overrides={"REMOTE_ADDR": "10.99.99.99"},
|
|
||||||
)
|
|
||||||
assert response.status_code == 429
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_rate_limit_empty_username_does_not_bucket(client) -> None:
|
|
||||||
"""Anonymous probes with no username must not all share one bucket — that
|
|
||||||
would let an attacker DoS legitimate empty-username 401s. We just don't
|
|
||||||
track them per-username."""
|
|
||||||
for i in range(25):
|
|
||||||
client.post(
|
|
||||||
"/login",
|
|
||||||
data={"username": "", "password": "wrong"},
|
|
||||||
environ_overrides={"REMOTE_ADDR": f"10.1.0.{i}"},
|
|
||||||
)
|
|
||||||
|
|
||||||
# A real username from a fresh IP must still get through to the
|
|
||||||
# 401 path (i.e. not be 429ed by empty-username spillover).
|
|
||||||
response = client.post(
|
|
||||||
"/login",
|
|
||||||
data={"username": "alice", "password": "wrong"},
|
|
||||||
environ_overrides={"REMOTE_ADDR": "10.2.0.1"},
|
|
||||||
)
|
|
||||||
assert response.status_code == 401
|
|
||||||
|
|
||||||
|
|
||||||
def test_security_headers_present(client) -> None:
|
|
||||||
response = client.get("/login")
|
|
||||||
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
|
||||||
assert response.headers.get("X-Frame-Options") == "DENY"
|
|
||||||
assert response.headers.get("Referrer-Policy") == "strict-origin-when-cross-origin"
|
|
||||||
csp = response.headers.get("Content-Security-Policy")
|
|
||||||
assert csp is not None
|
|
||||||
assert "default-src 'self'" in csp
|
|
||||||
assert "frame-ancestors 'none'" in csp
|
|
||||||
assert "form-action 'self'" in csp
|
|
||||||
|
|
||||||
|
|
||||||
def test_csp_nonce_matches_inline_script(client) -> None:
|
|
||||||
"""Pages that emit inline <script> blocks must tag them with the
|
|
||||||
per-request CSP nonce, otherwise the browser blocks them."""
|
|
||||||
import re
|
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
sess["csrf_token"] = "tok"
|
|
||||||
|
|
||||||
# Log in so we can hit /servers.
|
|
||||||
with session_scope() as db:
|
|
||||||
from sqlalchemy import select
|
|
||||||
alice = db.scalar(select(User).where(User.username == "alice"))
|
|
||||||
alice_id = alice.id
|
|
||||||
marker = alice.password_changed_at.isoformat()
|
|
||||||
with client.session_transaction() as sess:
|
|
||||||
sess["user_id"] = alice_id
|
|
||||||
sess["pw_changed_at"] = marker
|
|
||||||
sess["admin"] = False
|
|
||||||
|
|
||||||
response = client.get("/servers?prefill_blueprint_id=1")
|
|
||||||
csp = response.headers.get("Content-Security-Policy", "")
|
|
||||||
nonce_match = re.search(r"'nonce-([^']+)'", csp)
|
|
||||||
assert nonce_match, f"expected nonce in CSP, got: {csp}"
|
|
||||||
nonce = nonce_match.group(1)
|
|
||||||
body = response.get_data(as_text=True)
|
|
||||||
if "<script>" in body:
|
|
||||||
# An un-nonced inline script slipped through.
|
|
||||||
pytest.fail("inline <script> without nonce attribute")
|
|
||||||
if "prefill_blueprint_id" in body and "showModal" in body:
|
|
||||||
assert f'nonce="{nonce}"' in body
|
|
||||||
|
|
||||||
|
|
||||||
def test_404_response_is_generic(client) -> None:
|
|
||||||
response = client.get("/this-route-does-not-exist")
|
|
||||||
assert response.status_code == 404
|
|
||||||
body = response.get_data(as_text=True)
|
|
||||||
assert "Traceback" not in body
|
|
||||||
assert "werkzeug" not in body.lower()
|
|
||||||
|
|
||||||
|
|
||||||
def test_500_response_does_not_leak_traceback(tmp_path, monkeypatch) -> None:
|
|
||||||
from l4d2web.app import create_app
|
|
||||||
db_url = f"sqlite:///{tmp_path/'boom.db'}"
|
|
||||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
||||||
boom_app = create_app({
|
|
||||||
"TESTING": True,
|
|
||||||
"DATABASE_URL": db_url,
|
|
||||||
"SECRET_KEY": "test",
|
|
||||||
"PROPAGATE_EXCEPTIONS": False,
|
|
||||||
})
|
|
||||||
|
|
||||||
@boom_app.route("/__boom__")
|
|
||||||
def boom():
|
|
||||||
raise RuntimeError("internal detail that must not leak")
|
|
||||||
|
|
||||||
response = boom_app.test_client().get("/__boom__")
|
|
||||||
assert response.status_code == 500
|
|
||||||
body = response.get_data(as_text=True)
|
|
||||||
assert "internal detail" not in body
|
|
||||||
assert "Traceback" not in body
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue