Compare commits

..

13 commits

Author SHA1 Message Date
mwiegand
cb52a69faf
tests/test_profile: hoist sqlalchemy import to module top
ruff E402: import was after non-import top-level code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:01:29 +02:00
mwiegand
f643246a84
cli: apply min-length password policy in create-user
Same validate_new_password used by the web change-password flow,
so the policy is enforced uniformly across CLI and HTTP entry
points. Existing CLI tests bumped to passwords that satisfy the
new floor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:00:16 +02:00
mwiegand
224b023ca0
profile: rate-limit test for POST /profile/password
Exceeding the per-IP attempt cap within the window returns 429.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:58:46 +02:00
mwiegand
47722dbb19
profile: happy-path + cross-session invalidation tests
Verifies that on a successful change the digest rotates, the
password_changed_at advances, this session keeps working with the
re-stamped marker, and a parallel session forged from the
pre-change marker is rejected by load_current_user.

profile_password_change now writes a naive password_changed_at so
the in-memory marker matches what SQLite returns on next read.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:58:26 +02:00
mwiegand
d25fb57f30
profile: POST /profile/password validation branches
Implements the change-password endpoint:
- Per-IP rate limit reusing services/rate_limit
- Required fields, mismatched-confirm, policy, wrong-current
  branches each redirect with a specific ?error= key
- Rotates digest + password_changed_at, then re-stamps the
  current session marker so this browser stays logged in
  while other sessions get rejected by load_current_user

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:57:11 +02:00
mwiegand
eef85f36a9
profile: GET /profile page with change-password form
Adds the page reachable from the username link in the header.
Renders the form skeleton; the POST handler lands in the next
commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:55:34 +02:00
mwiegand
e75f379dcb
auth: reject sessions older than user.password_changed_at
load_current_user now treats a session whose pw_changed_at marker
is missing, malformed, or older than the user's current
password_changed_at as logged-out. Same shape as the existing
user.active check.

Forced fan-out updates to every test fixture that forges a session
via session_transaction(): each now stamps a current pw_changed_at
marker. test_deactivated_user_existing_session_invalidated keeps
its meaning — the deactivation still flips the user to inactive,
and load_current_user rejects the session via the user.active
branch before reaching the freshness branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:54:13 +02:00
mwiegand
84dc672180
auth: stamp password_changed_at marker in session on login
login_user now records the user's current password_changed_at on the
session. The next commit will use this marker to invalidate sessions
whose password has been rotated under them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:46:20 +02:00
mwiegand
26a6a9d7b0
rate-limit: extract generic helper, reuse from login
Pulled the per-IP sliding-window check out of auth_routes so the
upcoming /profile/password endpoint can share it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:45:51 +02:00
mwiegand
a5982941df
auth: validate_new_password helper (min length 8)
Single source of truth for the password policy, to be reused by the
upcoming /profile/password endpoint and (optionally) the create-user
CLI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:45:03 +02:00
mwiegand
2353378b23
alembic: add users.password_changed_at column
Backfills existing rows from created_at, then enforces NOT NULL.
Existing sessions without a pw_changed_at marker will be rejected
on next request once the freshness check lands (one-time forced
re-login post-deploy).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:44:39 +02:00
mwiegand
eb1f2b82eb
models: add User.password_changed_at
First step of the self-service password-change feature: a timestamp
that backs the per-session freshness check used to invalidate other
sessions on password change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:43:25 +02:00
mwiegand
6eb9bd0ab3
docs: plan for profile page with self-service password change
Adds /profile reachable via header username, with change-password form
as its first section. Industry-standard session semantics: other sessions
invalidated on password change, current session kept, via new
users.password_changed_at column + session marker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:41:25 +02:00
25 changed files with 1943 additions and 22 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,32 @@
"""users.password_changed_at
Revision ID: 0009_user_password_changed_at
Revises: 0008_user_active
Create Date: 2026-05-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0009_user_password_changed_at"
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
op.execute(
"UPDATE users SET password_changed_at = created_at "
"WHERE password_changed_at IS NULL"
)
with op.batch_alter_table("users") as batch_op:
batch_op.alter_column("password_changed_at", nullable=False)
def downgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.drop_column("password_changed_at")

View file

@ -16,6 +16,8 @@ from l4d2web.routes.job_routes import bp as job_bp
from l4d2web.routes.log_routes import bp as log_bp
from l4d2web.routes.overlay_routes import bp as overlay_bp
from l4d2web.routes.page_routes import bp as page_bp
from l4d2web.routes.profile_routes import bp as profile_bp
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
from l4d2web.routes.server_routes import bp as server_bp
from l4d2web.routes.workshop_routes import bp as workshop_bp
from l4d2web.services.job_worker import (
@ -82,9 +84,11 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
app.register_blueprint(job_bp)
app.register_blueprint(log_bp)
app.register_blueprint(page_bp)
app.register_blueprint(profile_bp)
register_cli(app)
if app.config.get("TESTING"):
reset_login_rate_limits()
reset_profile_password_rate_limits()
should_start_workers = (
app.config.get("JOB_WORKER_ENABLED")
and not app.config.get("TESTING")

View file

@ -1,3 +1,4 @@
from datetime import datetime
from functools import wraps
from typing import Callable, TypeVar
from urllib.parse import quote, unquote
@ -21,6 +22,17 @@ def verify_password(raw: str, digest: str) -> bool:
return check_password_hash(digest, raw)
MIN_PASSWORD_LENGTH = 8
def validate_new_password(raw: str) -> str | None:
if raw == "":
return "password must not be empty"
if len(raw) < MIN_PASSWORD_LENGTH:
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
return None
def load_current_user() -> None:
user_id = session.get("user_id")
if user_id is None:
@ -28,17 +40,38 @@ def load_current_user() -> None:
return
with session_scope() as db:
user = db.scalar(select(User).where(User.id == int(user_id)))
# Treat deactivated users as logged-out so existing sessions stop
# working as soon as an admin flips active=False.
g.user = user if user is not None and user.active else None
if user is None or not user.active:
g.user = None
return
marker = session.get("pw_changed_at")
if marker is None:
g.user = None
return
try:
marker_dt = datetime.fromisoformat(marker)
except ValueError:
g.user = None
return
# user.password_changed_at comes back naive from SQLite; strip tz from the
# marker so an aware-marker session (just stamped from an in-memory user)
# compares cleanly with a freshly-loaded user row.
if marker_dt.tzinfo is not None:
marker_dt = marker_dt.replace(tzinfo=None)
if marker_dt < user.password_changed_at:
g.user = None
return
g.user = user
def current_user() -> User | None:
return getattr(g, "user", None)
def login_user(user_id: int) -> None:
def login_user(user_id: int, password_changed_at) -> None:
session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
def logout_user() -> None:

View file

@ -5,7 +5,7 @@ import click
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select
from l4d2web.auth import hash_password
from l4d2web.auth import hash_password, validate_new_password
from l4d2web.db import session_scope
from l4d2web.models import Overlay, User
from l4d2web.services.overlay_creation import (
@ -31,8 +31,9 @@ def create_user(username: str, admin: bool) -> None:
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
if password is None:
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
if password == "":
raise click.ClickException("password must not be empty")
policy_error = validate_new_password(password)
if policy_error is not None:
raise click.ClickException(policy_error)
try:
with session_scope() as db:

View file

@ -35,6 +35,7 @@ class User(Base):
)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class Overlay(Base):

View file

@ -1,16 +1,15 @@
import time
from flask import Blueprint, Response, redirect, render_template, request
from sqlalchemy import select
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
from l4d2web.db import session_scope
from l4d2web.models import User
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("auth", __name__)
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
@ -20,14 +19,12 @@ def reset_login_rate_limits() -> None:
def is_login_rate_limited(remote_addr: str) -> bool:
now = time.time()
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
return True
attempts.append(now)
return False
return check_rate_limit(
LOGIN_ATTEMPTS_BY_IP,
remote_addr,
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
)
@bp.get("/login")
@ -52,7 +49,7 @@ def login() -> Response:
# Same generic response for missing user, wrong password, or
# deactivated account — no timing oracle for deactivation status.
return Response("invalid credentials", status=401)
login_user(user.id)
login_user(user.id, user.password_changed_at)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
next_target = request.form.get("next", "")
return redirect(next_target if is_safe_next(next_target) else "/dashboard")

View file

@ -0,0 +1,93 @@
from flask import Blueprint, Response, redirect, render_template, request, session
from sqlalchemy import select
from l4d2web.auth import (
current_user,
hash_password,
require_login,
validate_new_password,
verify_password,
)
from l4d2web.db import session_scope
from l4d2web.models import User, now_utc
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("profile", __name__)
_ERROR_MESSAGES = {
"fields_required": "Fill in all three fields.",
"mismatch": "New password and confirmation do not match.",
"too_short": "New password must be at least 8 characters.",
"empty": "New password must not be empty.",
"wrong_current": "Current password is incorrect.",
}
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
def reset_profile_password_rate_limits() -> None:
PROFILE_PW_ATTEMPTS_BY_IP.clear()
def _redirect_with_error(error_key: str) -> Response:
return redirect(f"/profile?error={error_key}")
@bp.get("/profile")
@require_login
def profile_page() -> str:
error_key = request.args.get("error", "")
success = request.args.get("success") == "1"
return render_template(
"profile.html",
error_message=_ERROR_MESSAGES.get(error_key, ""),
success=success,
)
@bp.post("/profile/password")
@require_login
def profile_password_change() -> Response:
remote_addr = request.remote_addr or "unknown"
if check_rate_limit(
PROFILE_PW_ATTEMPTS_BY_IP,
remote_addr,
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
):
return Response("too many attempts", status=429)
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_new_password = request.form.get("confirm_new_password", "")
if not current_password or not new_password or not confirm_new_password:
return _redirect_with_error("fields_required")
if new_password != confirm_new_password:
return _redirect_with_error("mismatch")
policy_error = validate_new_password(new_password)
if policy_error is not None:
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
return _redirect_with_error(error_key)
actor = current_user()
assert actor is not None # @require_login
with session_scope() as db:
user = db.scalar(select(User).where(User.id == actor.id))
if user is None or not verify_password(current_password, user.password_digest):
return _redirect_with_error("wrong_current")
user.password_digest = hash_password(new_password)
# Strip tz so the marker matches what a subsequent DB read returns
# (SQLite DateTime columns don't preserve tzinfo).
user.password_changed_at = now_utc().replace(tzinfo=None)
new_marker = user.password_changed_at.isoformat()
session["pw_changed_at"] = new_marker
return redirect("/profile?success=1")

View file

@ -0,0 +1,23 @@
import time
def check_rate_limit(
bucket: dict[str, list[float]],
remote_addr: str,
*,
window: float,
max_attempts: int,
) -> bool:
"""Return True if this request should be rate-limited (rejected).
The bucket is owned by the caller so each endpoint can keep an
independent count and tests can reset state cleanly.
"""
now = time.time()
attempts = bucket.setdefault(remote_addr, [])
cutoff = now - window
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= max_attempts:
return True
attempts.append(now)
return False

View file

@ -24,7 +24,7 @@
{% if g.user %}
<nav class="account-nav" aria-label="Account navigation">
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
<span class="muted">{{ g.user.username }}</span>
<a class="muted" href="/profile">{{ g.user.username }}</a>
<form method="post" action="/logout" class="inline-form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<button class="link-button" type="submit">logout</button>

View file

@ -0,0 +1,33 @@
{% extends "base.html" %}
{% block title %}Profile | left4me{% endblock %}
{% block content %}
<section class="panel">
<h1>Change password</h1>
{% if success %}
<p class="muted">Password changed.</p>
{% endif %}
{% if error_message %}
<p class="error">{{ error_message }}</p>
{% endif %}
<form method="post" action="/profile/password" class="form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<label>
Current password
<input type="password" name="current_password" autocomplete="current-password" required>
</label>
<label>
New password
<input type="password" name="new_password" autocomplete="new-password" required>
</label>
<label>
Confirm new password
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
</label>
<button type="submit">Change password</button>
</form>
</section>
{% endblock %}

View file

@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch):
db.add_all([admin, second_admin])
db.flush()
admin_id = admin.id
admin_marker = admin.password_changed_at.isoformat()
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = admin_marker
sess["csrf_token"] = "test-token"
return client, admin_id
@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
"""An active session at the moment of deactivation stops working."""
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
bob = db.scalar(select(User).where(User.id == target))
bob_marker = bob.password_changed_at.isoformat()
# Forge a session for bob.
bob_client = client.application.test_client()
with bob_client.session_transaction() as sess:
sess["user_id"] = target
sess["pw_changed_at"] = bob_marker
sess["csrf_token"] = "test-token"
# Sanity: bob can hit a logged-in route.

View file

@ -134,10 +134,75 @@ def test_login_sets_session(client) -> None:
assert sess.get("user_id") is not None
def test_load_current_user_rejects_missing_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
with client.session_transaction() as sess:
sess["user_id"] = uid
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_rejects_stale_marker(client) -> None:
from datetime import UTC, datetime, timedelta
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = stale.isoformat()
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_accepts_current_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
response = client.get("/dashboard")
assert response.status_code == 200
def test_login_stamps_password_changed_at_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
marker = sess.get("pw_changed_at")
assert marker is not None
with session_scope() as session:
user = session.query(User).filter_by(username="alice").one()
assert marker == user.password_changed_at.isoformat()
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
@ -150,6 +215,19 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
assert user.admin is True
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
assert result.exit_code != 0
assert "at least 8" in result.output
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
@ -163,10 +241,26 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
assert "password must not be empty" in result.output
def test_validate_new_password_rejects_empty():
from l4d2web.auth import validate_new_password
assert validate_new_password("") == "password must not be empty"
def test_validate_new_password_rejects_short():
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
assert MIN_PASSWORD_LENGTH == 8
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
def test_validate_new_password_accepts_min_length():
from l4d2web.auth import validate_new_password
assert validate_new_password("a" * 8) is None
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:

View file

@ -1,4 +1,5 @@
import json
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, blueprint_id

View file

@ -1,4 +1,5 @@
from pathlib import Path
from datetime import UTC, datetime
from sqlalchemy import text
@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200
@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True)
@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True)

View file

@ -18,3 +18,23 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
assert user.id is not None
assert blueprint.id is not None
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
db_url = f"sqlite:///{tmp_path/'pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
before = datetime.now(UTC).replace(tzinfo=None)
with session_scope() as db:
db.add(User(username="alice", password_digest=hash_password("secret")))
with session_scope() as db:
user = db.query(User).filter_by(username="alice").one()
assert user.password_changed_at is not None
assert user.password_changed_at >= before

View file

@ -1,5 +1,6 @@
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
from __future__ import annotations
from datetime import UTC, datetime
import io
import os
@ -34,6 +35,7 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -4,6 +4,7 @@ from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
from l4d2web.services.security import validate_overlay_ref
from datetime import UTC, datetime
@pytest.fixture
@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c

View file

@ -1,5 +1,6 @@
import pytest
from pathlib import Path
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client
@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = owner_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, blueprint_id
@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
admin_page = client.get("/admin")
assert admin_page.status_code == 200
@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}")
@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
logged_in_response = client.get("/")
assert logged_in_response.status_code == 302
@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
admin_client = app.test_client()
with admin_client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
user_client = app.test_client()
with user_client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = other_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/overlays/{overlay_id}/jobs")
assert response.status_code == 403

View file

@ -0,0 +1,258 @@
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import User
@pytest.fixture
def app_and_user(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'profile.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("currentpass"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
return app, uid, marker
def _logged_in_client(app, uid, marker):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
sess["csrf_token"] = "test-token"
return client
def test_profile_requires_login(app_and_user):
app, _, _ = app_and_user
response = app.test_client().get("/profile", follow_redirects=False)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_profile_page_renders(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/profile")
assert response.status_code == 200
body = response.get_data(as_text=True)
assert "Change password" in body
assert 'name="current_password"' in body
assert 'name="new_password"' in body
assert 'name="confirm_new_password"' in body
def test_base_template_links_username_to_profile(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/dashboard")
body = response.get_data(as_text=True)
assert 'href="/profile"' in body
assert ">alice<" in body
def _post_pw(client, **fields):
payload = {"csrf_token": "test-token", **fields}
return client.post("/profile/password", data=payload, follow_redirects=False)
def _digest_of(username):
with session_scope() as db:
u = db.scalar(select(User).where(User.username == username))
return u.password_digest
def test_post_password_rejects_missing_csrf(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = client.post(
"/profile/password",
data={
"current_password": "currentpass",
"new_password": "newpass12",
"confirm_new_password": "newpass12",
},
)
assert response.status_code == 400
assert _digest_of("alice") == before
def test_post_password_requires_login(app_and_user):
app, _, _ = app_and_user
client = app.test_client()
with client.session_transaction() as sess:
sess["csrf_token"] = "test-token"
response = client.post(
"/profile/password",
data={
"csrf_token": "test-token",
"current_password": "x",
"new_password": "y" * 8,
"confirm_new_password": "y" * 8,
},
follow_redirects=False,
)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_post_password_empty_fields_redirects_with_error(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
assert response.status_code == 302
assert "/profile?error=fields_required" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_mismatched_confirm(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass99",
)
assert response.status_code == 302
assert "/profile?error=mismatch" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_too_short(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="short7x",
confirm_new_password="short7x",
)
assert response.status_code == 302
assert "/profile?error=too_short" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_wrong_current(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert "/profile?error=wrong_current" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before_digest = _digest_of("alice")
with session_scope() as db:
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/profile?success=1")
after_digest = _digest_of("alice")
assert after_digest != before_digest
with session_scope() as db:
u = db.scalar(select(User).where(User.id == uid))
assert u.password_changed_at > before_marker
new_marker = u.password_changed_at.isoformat()
follow = client.get("/dashboard")
assert follow.status_code == 200
with client.session_transaction() as sess:
assert sess["pw_changed_at"] == new_marker
def test_other_session_dies_after_password_change(app_and_user):
app, uid, marker = app_and_user
primary = _logged_in_client(app, uid, marker)
other = _logged_in_client(app, uid, marker)
pre = other.get("/dashboard")
assert pre.status_code == 200
response = _post_pw(
primary,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
post = other.get("/dashboard", follow_redirects=False)
assert post.status_code == 302
assert "/login" in post.headers["Location"]
def test_new_password_works_for_login(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
_post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
fresh = app.test_client()
response = fresh.post(
"/login",
data={"username": "alice", "password": "newpass12"},
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/dashboard")
def test_post_password_rate_limited(app_and_user):
from l4d2web.routes.profile_routes import (
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
reset_profile_password_rate_limits,
)
reset_profile_password_rate_limits()
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
_post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
blocked = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert blocked.status_code == 429

View file

@ -0,0 +1,31 @@
import time
from l4d2web.services.rate_limit import check_rate_limit
def test_under_threshold_allows():
bucket: dict[str, list[float]] = {}
for _ in range(3):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
def test_at_threshold_blocks():
bucket: dict[str, list[float]] = {}
for _ in range(5):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
def test_other_ips_independent():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
def test_old_attempts_expire():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
time.sleep(0.1)
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False

View file

@ -1,6 +1,7 @@
"""Routes for type='script' overlays: create, /script (update body),
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -52,6 +53,7 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -1,4 +1,5 @@
import json
from datetime import UTC, datetime
import pytest
@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = payload["user_id"]
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, payload
@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.get("/servers")
@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = alice_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
alice_resp = client.post(
"/servers",
@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = bob_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
bob_resp = client.post(
"/servers",
@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
first = client.post(

View file

@ -1,4 +1,5 @@
import pytest
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, server_id

View file

@ -1,6 +1,7 @@
"""Tests for the workshop overlay routes (add items, remove items, build,
admin refresh)."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Iterable
from unittest.mock import patch
@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch):
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c