feat(l4d2-web): add csrf, rate limiting, and sqlite reliability settings

This commit is contained in:
mwiegand 2026-04-23 01:19:29 +02:00
parent f9c98506bd
commit ec74563705
No known key found for this signature in database
7 changed files with 111 additions and 5 deletions

View file

@ -1,6 +1,7 @@
import os
import secrets
from flask import Flask, jsonify
from flask import Flask, Response, jsonify, request, session
from l4d2web.auth import load_current_user
from l4d2web.cli import register_cli
@ -8,6 +9,7 @@ from l4d2web.config import DEFAULT_CONFIG
from l4d2web.db import init_db
from l4d2web.routes.blueprint_routes import bp as blueprint_bp
from l4d2web.routes.auth_routes import bp as auth_bp
from l4d2web.routes.auth_routes import reset_login_rate_limits
from l4d2web.routes.job_routes import bp as job_bp
from l4d2web.routes.log_routes import bp as log_bp
from l4d2web.routes.overlay_routes import bp as overlay_bp
@ -18,12 +20,33 @@ from l4d2web.services.job_worker import recover_stale_jobs
def create_app(test_config: dict[str, object] | None = None) -> Flask:
app = Flask(__name__)
app.config.from_mapping(DEFAULT_CONFIG)
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
CSRF_EXEMPT_PATHS={"/login", "/signup", "/health"},
)
if test_config is not None:
app.config.update(test_config)
os.environ["DATABASE_URL"] = str(app.config["DATABASE_URL"])
init_db()
@app.before_request
def csrf_protect() -> Response | None:
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_hex(16)
if request.method not in {"POST", "PUT", "PATCH", "DELETE"}:
return None
if request.path in app.config["CSRF_EXEMPT_PATHS"]:
return None
token = request.headers.get("X-CSRF-Token") or request.form.get("csrf_token")
if token != session.get("csrf_token"):
return Response("csrf token missing or invalid", status=400)
return None
app.before_request(load_current_user)
app.register_blueprint(auth_bp)
app.register_blueprint(overlay_bp)
@ -32,6 +55,8 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
app.register_blueprint(job_bp)
app.register_blueprint(log_bp)
register_cli(app)
if app.config.get("TESTING"):
reset_login_rate_limits()
recover_stale_jobs()
@app.get("/health")

View file

@ -23,6 +23,10 @@ def get_engine():
if _engine is None or _engine_url != db_url:
connect_args = {"check_same_thread": False} if db_url.startswith("sqlite") else {}
_engine = create_engine(db_url, connect_args=connect_args)
if db_url.startswith("sqlite"):
with _engine.connect() as conn:
conn.exec_driver_sql("PRAGMA journal_mode=WAL;")
conn.exec_driver_sql("PRAGMA busy_timeout=5000;")
_engine_url = db_url
_Session = sessionmaker(bind=_engine, expire_on_commit=False)
return _engine

View file

@ -1,3 +1,5 @@
import time
from flask import Blueprint, Response, request, redirect
from sqlalchemy import select
@ -7,6 +9,24 @@ from l4d2web.models import User
bp = Blueprint("auth", __name__)
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
def reset_login_rate_limits() -> None:
LOGIN_ATTEMPTS_BY_IP.clear()
def is_login_rate_limited(remote_addr: str) -> bool:
now = time.time()
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
return True
attempts.append(now)
return False
@bp.get("/signup")
@ -40,6 +60,10 @@ def login_form() -> Response:
@bp.post("/login")
def login() -> Response:
remote_addr = request.remote_addr or "unknown"
if is_login_rate_limited(remote_addr):
return Response("too many login attempts", status=429)
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
with session_scope() as db:
@ -47,6 +71,7 @@ def login() -> Response:
if user is None or not verify_password(password, user.password_digest):
return Response("invalid credentials", status=401)
login_user(user.id)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
return redirect("/dashboard")

View file

@ -30,6 +30,7 @@ def user_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["csrf_token"] = "test-token"
return client
@ -56,6 +57,7 @@ def linked_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["csrf_token"] = "test-token"
return client, blueprint_id
@ -68,11 +70,16 @@ def test_user_can_create_private_blueprint(user_client) -> None:
"overlay_ids": [1, 2],
}
response = user_client.post("/blueprints", data=json.dumps(payload), content_type="application/json")
response = user_client.post(
"/blueprints",
data=json.dumps(payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 201
def test_delete_blueprint_blocked_when_in_use(linked_blueprint) -> None:
client, blueprint_id = linked_blueprint
response = client.delete(f"/blueprints/{blueprint_id}")
response = client.delete(f"/blueprints/{blueprint_id}", headers={"X-CSRF-Token": "test-token"})
assert response.status_code == 409

View file

@ -22,6 +22,7 @@ def admin_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["csrf_token"] = "test-token"
return client
@ -29,6 +30,7 @@ def test_admin_can_create_overlay(admin_client) -> None:
response = admin_client.post(
"/admin/overlays",
data={"name": "standard", "path": "/opt/l4d2/overlays/standard"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302
@ -37,5 +39,6 @@ def test_overlay_path_must_be_under_root(admin_client) -> None:
response = admin_client.post(
"/admin/overlays",
data={"name": "bad", "path": "/tmp/bad"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 400

View file

@ -0,0 +1,30 @@
import pytest
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import User
@pytest.fixture
def client(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'security.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
return app.test_client()
def test_csrf_required(client) -> None:
response = client.post("/servers", data={"name": "x"})
assert response.status_code == 400
def test_login_rate_limit(client) -> None:
for _ in range(20):
client.post("/login", data={"username": "x", "password": "y"})
response = client.post("/login", data={"username": "x", "password": "y"})
assert response.status_code == 429

View file

@ -32,6 +32,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = payload["user_id"]
sess["csrf_token"] = "test-token"
return client, payload
@ -39,7 +40,12 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
def test_create_server_from_blueprint(user_client_with_blueprints) -> None:
client, data = user_client_with_blueprints
payload = {"name": "alpha", "port": 27015, "blueprint_id": data["blueprint_id"]}
response = client.post("/servers", data=json.dumps(payload), content_type="application/json")
response = client.post(
"/servers",
data=json.dumps(payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 201
@ -47,7 +53,12 @@ def test_reassign_blueprint_anytime(user_client_with_blueprints) -> None:
client, data = user_client_with_blueprints
create_payload = {"name": "alpha", "port": 27015, "blueprint_id": data["blueprint_id"]}
create_response = client.post("/servers", data=json.dumps(create_payload), content_type="application/json")
create_response = client.post(
"/servers",
data=json.dumps(create_payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
server_id = create_response.get_json()["id"]
patch_payload = {"blueprint_id": data["other_blueprint_id"]}
@ -55,5 +66,6 @@ def test_reassign_blueprint_anytime(user_client_with_blueprints) -> None:
f"/servers/{server_id}",
data=json.dumps(patch_payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 200