From d25fb57f30a16773b700bfac4da52033806e09cf Mon Sep 17 00:00:00 2001 From: mwiegand Date: Mon, 11 May 2026 21:57:11 +0200 Subject: [PATCH] profile: POST /profile/password validation branches Implements the change-password endpoint: - Per-IP rate limit reusing services/rate_limit - Required fields, mismatched-confirm, policy, wrong-current branches each redirect with a specific ?error= key - Rotates digest + password_changed_at, then re-stamps the current session marker so this browser stays logged in while other sessions get rejected by load_current_user Co-Authored-By: Claude Opus 4.7 (1M context) --- l4d2web/app.py | 2 + l4d2web/routes/profile_routes.py | 68 +++++++++++++++++++- l4d2web/tests/test_profile.py | 104 +++++++++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 2 deletions(-) diff --git a/l4d2web/app.py b/l4d2web/app.py index 670b25d..f952c70 100644 --- a/l4d2web/app.py +++ b/l4d2web/app.py @@ -17,6 +17,7 @@ from l4d2web.routes.log_routes import bp as log_bp from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.page_routes import bp as page_bp from l4d2web.routes.profile_routes import bp as profile_bp +from l4d2web.routes.profile_routes import reset_profile_password_rate_limits from l4d2web.routes.server_routes import bp as server_bp from l4d2web.routes.workshop_routes import bp as workshop_bp from l4d2web.services.job_worker import ( @@ -87,6 +88,7 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: register_cli(app) if app.config.get("TESTING"): reset_login_rate_limits() + reset_profile_password_rate_limits() should_start_workers = ( app.config.get("JOB_WORKER_ENABLED") and not app.config.get("TESTING") diff --git a/l4d2web/routes/profile_routes.py b/l4d2web/routes/profile_routes.py index 5f464e1..3dacfb2 100644 --- a/l4d2web/routes/profile_routes.py +++ b/l4d2web/routes/profile_routes.py @@ -1,6 +1,16 @@ -from flask import Blueprint, render_template, request +from flask import Blueprint, Response, redirect, render_template, request, session +from sqlalchemy import select -from l4d2web.auth import require_login +from l4d2web.auth import ( + current_user, + hash_password, + require_login, + validate_new_password, + verify_password, +) +from l4d2web.db import session_scope +from l4d2web.models import User, now_utc +from l4d2web.services.rate_limit import check_rate_limit bp = Blueprint("profile", __name__) @@ -15,6 +25,19 @@ _ERROR_MESSAGES = { } +PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0 +PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10 +PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {} + + +def reset_profile_password_rate_limits() -> None: + PROFILE_PW_ATTEMPTS_BY_IP.clear() + + +def _redirect_with_error(error_key: str) -> Response: + return redirect(f"/profile?error={error_key}") + + @bp.get("/profile") @require_login def profile_page() -> str: @@ -25,3 +48,44 @@ def profile_page() -> str: error_message=_ERROR_MESSAGES.get(error_key, ""), success=success, ) + + +@bp.post("/profile/password") +@require_login +def profile_password_change() -> Response: + remote_addr = request.remote_addr or "unknown" + if check_rate_limit( + PROFILE_PW_ATTEMPTS_BY_IP, + remote_addr, + window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS, + max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS, + ): + return Response("too many attempts", status=429) + + current_password = request.form.get("current_password", "") + new_password = request.form.get("new_password", "") + confirm_new_password = request.form.get("confirm_new_password", "") + + if not current_password or not new_password or not confirm_new_password: + return _redirect_with_error("fields_required") + if new_password != confirm_new_password: + return _redirect_with_error("mismatch") + policy_error = validate_new_password(new_password) + if policy_error is not None: + error_key = "empty" if policy_error == "password must not be empty" else "too_short" + return _redirect_with_error(error_key) + + actor = current_user() + assert actor is not None # @require_login + + with session_scope() as db: + user = db.scalar(select(User).where(User.id == actor.id)) + if user is None or not verify_password(current_password, user.password_digest): + return _redirect_with_error("wrong_current") + user.password_digest = hash_password(new_password) + user.password_changed_at = now_utc() + new_marker = user.password_changed_at.isoformat() + + session["pw_changed_at"] = new_marker + + return redirect("/profile?success=1") diff --git a/l4d2web/tests/test_profile.py b/l4d2web/tests/test_profile.py index af88eec..da1ea52 100644 --- a/l4d2web/tests/test_profile.py +++ b/l4d2web/tests/test_profile.py @@ -56,3 +56,107 @@ def test_base_template_links_username_to_profile(app_and_user): body = response.get_data(as_text=True) assert 'href="/profile"' in body assert ">alice<" in body + + +from sqlalchemy import select + + +def _post_pw(client, **fields): + payload = {"csrf_token": "test-token", **fields} + return client.post("/profile/password", data=payload, follow_redirects=False) + + +def _digest_of(username): + with session_scope() as db: + u = db.scalar(select(User).where(User.username == username)) + return u.password_digest + + +def test_post_password_rejects_missing_csrf(app_and_user): + app, uid, marker = app_and_user + client = _logged_in_client(app, uid, marker) + before = _digest_of("alice") + response = client.post( + "/profile/password", + data={ + "current_password": "currentpass", + "new_password": "newpass12", + "confirm_new_password": "newpass12", + }, + ) + assert response.status_code == 400 + assert _digest_of("alice") == before + + +def test_post_password_requires_login(app_and_user): + app, _, _ = app_and_user + client = app.test_client() + with client.session_transaction() as sess: + sess["csrf_token"] = "test-token" + response = client.post( + "/profile/password", + data={ + "csrf_token": "test-token", + "current_password": "x", + "new_password": "y" * 8, + "confirm_new_password": "y" * 8, + }, + follow_redirects=False, + ) + assert response.status_code == 302 + assert "/login" in response.headers["Location"] + + +def test_post_password_empty_fields_redirects_with_error(app_and_user): + app, uid, marker = app_and_user + client = _logged_in_client(app, uid, marker) + before = _digest_of("alice") + response = _post_pw(client, current_password="", new_password="", confirm_new_password="") + assert response.status_code == 302 + assert "/profile?error=fields_required" in response.headers["Location"] + assert _digest_of("alice") == before + + +def test_post_password_mismatched_confirm(app_and_user): + app, uid, marker = app_and_user + client = _logged_in_client(app, uid, marker) + before = _digest_of("alice") + response = _post_pw( + client, + current_password="currentpass", + new_password="newpass12", + confirm_new_password="newpass99", + ) + assert response.status_code == 302 + assert "/profile?error=mismatch" in response.headers["Location"] + assert _digest_of("alice") == before + + +def test_post_password_too_short(app_and_user): + app, uid, marker = app_and_user + client = _logged_in_client(app, uid, marker) + before = _digest_of("alice") + response = _post_pw( + client, + current_password="currentpass", + new_password="short7x", + confirm_new_password="short7x", + ) + assert response.status_code == 302 + assert "/profile?error=too_short" in response.headers["Location"] + assert _digest_of("alice") == before + + +def test_post_password_wrong_current(app_and_user): + app, uid, marker = app_and_user + client = _logged_in_client(app, uid, marker) + before = _digest_of("alice") + response = _post_pw( + client, + current_password="WRONG", + new_password="newpass12", + confirm_new_password="newpass12", + ) + assert response.status_code == 302 + assert "/profile?error=wrong_current" in response.headers["Location"] + assert _digest_of("alice") == before