From ec74563705245f2f314203041c53a9af91580ece Mon Sep 17 00:00:00 2001 From: mwiegand Date: Thu, 23 Apr 2026 01:19:29 +0200 Subject: [PATCH] feat(l4d2-web): add csrf, rate limiting, and sqlite reliability settings --- components/l4d2-web-app/src/l4d2web/app.py | 27 ++++++++++++++++- components/l4d2-web-app/src/l4d2web/db.py | 4 +++ .../src/l4d2web/routes/auth_routes.py | 25 ++++++++++++++++ .../l4d2-web-app/tests/test_blueprints.py | 11 +++++-- .../l4d2-web-app/tests/test_overlays.py | 3 ++ .../l4d2-web-app/tests/test_security.py | 30 +++++++++++++++++++ components/l4d2-web-app/tests/test_servers.py | 16 ++++++++-- 7 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 components/l4d2-web-app/tests/test_security.py diff --git a/components/l4d2-web-app/src/l4d2web/app.py b/components/l4d2-web-app/src/l4d2web/app.py index dde4467..a5901a3 100644 --- a/components/l4d2-web-app/src/l4d2web/app.py +++ b/components/l4d2-web-app/src/l4d2web/app.py @@ -1,6 +1,7 @@ import os +import secrets -from flask import Flask, jsonify +from flask import Flask, Response, jsonify, request, session from l4d2web.auth import load_current_user from l4d2web.cli import register_cli @@ -8,6 +9,7 @@ from l4d2web.config import DEFAULT_CONFIG from l4d2web.db import init_db from l4d2web.routes.blueprint_routes import bp as blueprint_bp from l4d2web.routes.auth_routes import bp as auth_bp +from l4d2web.routes.auth_routes import reset_login_rate_limits from l4d2web.routes.job_routes import bp as job_bp from l4d2web.routes.log_routes import bp as log_bp from l4d2web.routes.overlay_routes import bp as overlay_bp @@ -18,12 +20,33 @@ from l4d2web.services.job_worker import recover_stale_jobs def create_app(test_config: dict[str, object] | None = None) -> Flask: app = Flask(__name__) app.config.from_mapping(DEFAULT_CONFIG) + app.config.update( + SESSION_COOKIE_HTTPONLY=True, + SESSION_COOKIE_SAMESITE="Lax", + CSRF_EXEMPT_PATHS={"/login", "/signup", "/health"}, + ) if test_config is not None: app.config.update(test_config) os.environ["DATABASE_URL"] = str(app.config["DATABASE_URL"]) init_db() + @app.before_request + def csrf_protect() -> Response | None: + if "csrf_token" not in session: + session["csrf_token"] = secrets.token_hex(16) + + if request.method not in {"POST", "PUT", "PATCH", "DELETE"}: + return None + + if request.path in app.config["CSRF_EXEMPT_PATHS"]: + return None + + token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token") + if token != session.get("csrf_token"): + return Response("csrf token missing or invalid", status=400) + return None + app.before_request(load_current_user) app.register_blueprint(auth_bp) app.register_blueprint(overlay_bp) @@ -32,6 +55,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: app.register_blueprint(job_bp) app.register_blueprint(log_bp) register_cli(app) + if app.config.get("TESTING"): + reset_login_rate_limits() recover_stale_jobs() @app.get("/health") diff --git a/components/l4d2-web-app/src/l4d2web/db.py b/components/l4d2-web-app/src/l4d2web/db.py index 78647de..ae1c538 100644 --- a/components/l4d2-web-app/src/l4d2web/db.py +++ b/components/l4d2-web-app/src/l4d2web/db.py @@ -23,6 +23,10 @@ def get_engine(): if _engine is None or _engine_url != db_url: connect_args = {"check_same_thread": False} if db_url.startswith("sqlite") else {} _engine = create_engine(db_url, connect_args=connect_args) + if db_url.startswith("sqlite"): + with _engine.connect() as conn: + conn.exec_driver_sql("PRAGMA journal_mode=WAL;") + conn.exec_driver_sql("PRAGMA busy_timeout=5000;") _engine_url = db_url _Session = sessionmaker(bind=_engine, expire_on_commit=False) return _engine diff --git a/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py b/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py index 19cbf80..e894bbb 100644 --- a/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py +++ b/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py @@ -1,3 +1,5 @@ +import time + from flask import Blueprint, Response, request, redirect from sqlalchemy import select @@ -7,6 +9,24 @@ from l4d2web.models import User bp = Blueprint("auth", __name__) +LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60 +LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20 +LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {} + + +def reset_login_rate_limits() -> None: + LOGIN_ATTEMPTS_BY_IP.clear() + + +def is_login_rate_limited(remote_addr: str) -> bool: + now = time.time() + attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, []) + cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS + attempts[:] = [ts for ts in attempts if ts >= cutoff] + if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS: + return True + attempts.append(now) + return False @bp.get("/signup") @@ -40,6 +60,10 @@ def login_form() -> Response: @bp.post("/login") def login() -> Response: + remote_addr = request.remote_addr or "unknown" + if is_login_rate_limited(remote_addr): + return Response("too many login attempts", status=429) + username = request.form.get("username", "").strip() password = request.form.get("password", "") with session_scope() as db: @@ -47,6 +71,7 @@ def login() -> Response: if user is None or not verify_password(password, user.password_digest): return Response("invalid credentials", status=401) login_user(user.id) + LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None) return redirect("/dashboard") diff --git a/components/l4d2-web-app/tests/test_blueprints.py b/components/l4d2-web-app/tests/test_blueprints.py index 6eb8aaf..8d36d5a 100644 --- a/components/l4d2-web-app/tests/test_blueprints.py +++ b/components/l4d2-web-app/tests/test_blueprints.py @@ -30,6 +30,7 @@ def user_client(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["csrf_token"] = "test-token" return client @@ -56,6 +57,7 @@ def linked_blueprint(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = user_id + sess["csrf_token"] = "test-token" return client, blueprint_id @@ -68,11 +70,16 @@ def test_user_can_create_private_blueprint(user_client) -> None: "overlay_ids": [1, 2], } - response = user_client.post("/blueprints", data=json.dumps(payload), content_type="application/json") + response = user_client.post( + "/blueprints", + data=json.dumps(payload), + content_type="application/json", + headers={"X-CSRF-Token": "test-token"}, + ) assert response.status_code == 201 def test_delete_blueprint_blocked_when_in_use(linked_blueprint) -> None: client, blueprint_id = linked_blueprint - response = client.delete(f"/blueprints/{blueprint_id}") + response = client.delete(f"/blueprints/{blueprint_id}", headers={"X-CSRF-Token": "test-token"}) assert response.status_code == 409 diff --git a/components/l4d2-web-app/tests/test_overlays.py b/components/l4d2-web-app/tests/test_overlays.py index 5de62ca..9e77a2b 100644 --- a/components/l4d2-web-app/tests/test_overlays.py +++ b/components/l4d2-web-app/tests/test_overlays.py @@ -22,6 +22,7 @@ def admin_client(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = admin_id + sess["csrf_token"] = "test-token" return client @@ -29,6 +30,7 @@ def test_admin_can_create_overlay(admin_client) -> None: response = admin_client.post( "/admin/overlays", data={"name": "standard", "path": "/opt/l4d2/overlays/standard"}, + headers={"X-CSRF-Token": "test-token"}, ) assert response.status_code == 302 @@ -37,5 +39,6 @@ def test_overlay_path_must_be_under_root(admin_client) -> None: response = admin_client.post( "/admin/overlays", data={"name": "bad", "path": "/tmp/bad"}, + headers={"X-CSRF-Token": "test-token"}, ) assert response.status_code == 400 diff --git a/components/l4d2-web-app/tests/test_security.py b/components/l4d2-web-app/tests/test_security.py new file mode 100644 index 0000000..441397d --- /dev/null +++ b/components/l4d2-web-app/tests/test_security.py @@ -0,0 +1,30 @@ +import pytest + +from l4d2web.app import create_app +from l4d2web.auth import hash_password +from l4d2web.db import init_db, session_scope +from l4d2web.models import User + + +@pytest.fixture +def client(tmp_path, monkeypatch): + db_url = f"sqlite:///{tmp_path/'security.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + with session_scope() as session: + session.add(User(username="alice", password_digest=hash_password("secret"), admin=False)) + return app.test_client() + + +def test_csrf_required(client) -> None: + response = client.post("/servers", data={"name": "x"}) + assert response.status_code == 400 + + +def test_login_rate_limit(client) -> None: + for _ in range(20): + client.post("/login", data={"username": "x", "password": "y"}) + + response = client.post("/login", data={"username": "x", "password": "y"}) + assert response.status_code == 429 diff --git a/components/l4d2-web-app/tests/test_servers.py b/components/l4d2-web-app/tests/test_servers.py index 72446b7..38d7b90 100644 --- a/components/l4d2-web-app/tests/test_servers.py +++ b/components/l4d2-web-app/tests/test_servers.py @@ -32,6 +32,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch): client = app.test_client() with client.session_transaction() as sess: sess["user_id"] = payload["user_id"] + sess["csrf_token"] = "test-token" return client, payload @@ -39,7 +40,12 @@ def user_client_with_blueprints(tmp_path, monkeypatch): def test_create_server_from_blueprint(user_client_with_blueprints) -> None: client, data = user_client_with_blueprints payload = {"name": "alpha", "port": 27015, "blueprint_id": data["blueprint_id"]} - response = client.post("/servers", data=json.dumps(payload), content_type="application/json") + response = client.post( + "/servers", + data=json.dumps(payload), + content_type="application/json", + headers={"X-CSRF-Token": "test-token"}, + ) assert response.status_code == 201 @@ -47,7 +53,12 @@ def test_reassign_blueprint_anytime(user_client_with_blueprints) -> None: client, data = user_client_with_blueprints create_payload = {"name": "alpha", "port": 27015, "blueprint_id": data["blueprint_id"]} - create_response = client.post("/servers", data=json.dumps(create_payload), content_type="application/json") + create_response = client.post( + "/servers", + data=json.dumps(create_payload), + content_type="application/json", + headers={"X-CSRF-Token": "test-token"}, + ) server_id = create_response.get_json()["id"] patch_payload = {"blueprint_id": data["other_blueprint_id"]} @@ -55,5 +66,6 @@ def test_reassign_blueprint_anytime(user_client_with_blueprints) -> None: f"/servers/{server_id}", data=json.dumps(patch_payload), content_type="application/json", + headers={"X-CSRF-Token": "test-token"}, ) assert response.status_code == 200