Compare commits

...

13 commits

Author SHA1 Message Date
mwiegand
cb52a69faf
tests/test_profile: hoist sqlalchemy import to module top
ruff E402: import was after non-import top-level code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:01:29 +02:00
mwiegand
f643246a84
cli: apply min-length password policy in create-user
Same validate_new_password used by the web change-password flow,
so the policy is enforced uniformly across CLI and HTTP entry
points. Existing CLI tests bumped to passwords that satisfy the
new floor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:00:16 +02:00
mwiegand
224b023ca0
profile: rate-limit test for POST /profile/password
Exceeding the per-IP attempt cap within the window returns 429.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:58:46 +02:00
mwiegand
47722dbb19
profile: happy-path + cross-session invalidation tests
Verifies that on a successful change the digest rotates, the
password_changed_at advances, this session keeps working with the
re-stamped marker, and a parallel session forged from the
pre-change marker is rejected by load_current_user.

profile_password_change now writes a naive password_changed_at so
the in-memory marker matches what SQLite returns on next read.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:58:26 +02:00
mwiegand
d25fb57f30
profile: POST /profile/password validation branches
Implements the change-password endpoint:
- Per-IP rate limit reusing services/rate_limit
- Required fields, mismatched-confirm, policy, wrong-current
  branches each redirect with a specific ?error= key
- Rotates digest + password_changed_at, then re-stamps the
  current session marker so this browser stays logged in
  while other sessions get rejected by load_current_user

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:57:11 +02:00
mwiegand
eef85f36a9
profile: GET /profile page with change-password form
Adds the page reachable from the username link in the header.
Renders the form skeleton; the POST handler lands in the next
commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:55:34 +02:00
mwiegand
e75f379dcb
auth: reject sessions older than user.password_changed_at
load_current_user now treats a session whose pw_changed_at marker
is missing, malformed, or older than the user's current
password_changed_at as logged-out. Same shape as the existing
user.active check.

Forced fan-out updates to every test fixture that forges a session
via session_transaction(): each now stamps a current pw_changed_at
marker. test_deactivated_user_existing_session_invalidated keeps
its meaning — the deactivation still flips the user to inactive,
and load_current_user rejects the session via the user.active
branch before reaching the freshness branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:54:13 +02:00
mwiegand
84dc672180
auth: stamp password_changed_at marker in session on login
login_user now records the user's current password_changed_at on the
session. The next commit will use this marker to invalidate sessions
whose password has been rotated under them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:46:20 +02:00
mwiegand
26a6a9d7b0
rate-limit: extract generic helper, reuse from login
Pulled the per-IP sliding-window check out of auth_routes so the
upcoming /profile/password endpoint can share it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:45:51 +02:00
mwiegand
a5982941df
auth: validate_new_password helper (min length 8)
Single source of truth for the password policy, to be reused by the
upcoming /profile/password endpoint and (optionally) the create-user
CLI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:45:03 +02:00
mwiegand
2353378b23
alembic: add users.password_changed_at column
Backfills existing rows from created_at, then enforces NOT NULL.
Existing sessions without a pw_changed_at marker will be rejected
on next request once the freshness check lands (one-time forced
re-login post-deploy).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:44:39 +02:00
mwiegand
eb1f2b82eb
models: add User.password_changed_at
First step of the self-service password-change feature: a timestamp
that backs the per-session freshness check used to invalidate other
sessions on password change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:43:25 +02:00
mwiegand
6eb9bd0ab3
docs: plan for profile page with self-service password change
Adds /profile reachable via header username, with change-password form
as its first section. Industry-standard session semantics: other sessions
invalidated on password change, current session kept, via new
users.password_changed_at column + session marker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 21:41:25 +02:00
25 changed files with 1943 additions and 22 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,32 @@
"""users.password_changed_at
Revision ID: 0009_user_password_changed_at
Revises: 0008_user_active
Create Date: 2026-05-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0009_user_password_changed_at"
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
op.execute(
"UPDATE users SET password_changed_at = created_at "
"WHERE password_changed_at IS NULL"
)
with op.batch_alter_table("users") as batch_op:
batch_op.alter_column("password_changed_at", nullable=False)
def downgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.drop_column("password_changed_at")

View file

@ -16,6 +16,8 @@ from l4d2web.routes.job_routes import bp as job_bp
from l4d2web.routes.log_routes import bp as log_bp from l4d2web.routes.log_routes import bp as log_bp
from l4d2web.routes.overlay_routes import bp as overlay_bp from l4d2web.routes.overlay_routes import bp as overlay_bp
from l4d2web.routes.page_routes import bp as page_bp from l4d2web.routes.page_routes import bp as page_bp
from l4d2web.routes.profile_routes import bp as profile_bp
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
from l4d2web.routes.server_routes import bp as server_bp from l4d2web.routes.server_routes import bp as server_bp
from l4d2web.routes.workshop_routes import bp as workshop_bp from l4d2web.routes.workshop_routes import bp as workshop_bp
from l4d2web.services.job_worker import ( from l4d2web.services.job_worker import (
@ -82,9 +84,11 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
app.register_blueprint(job_bp) app.register_blueprint(job_bp)
app.register_blueprint(log_bp) app.register_blueprint(log_bp)
app.register_blueprint(page_bp) app.register_blueprint(page_bp)
app.register_blueprint(profile_bp)
register_cli(app) register_cli(app)
if app.config.get("TESTING"): if app.config.get("TESTING"):
reset_login_rate_limits() reset_login_rate_limits()
reset_profile_password_rate_limits()
should_start_workers = ( should_start_workers = (
app.config.get("JOB_WORKER_ENABLED") app.config.get("JOB_WORKER_ENABLED")
and not app.config.get("TESTING") and not app.config.get("TESTING")

View file

@ -1,3 +1,4 @@
from datetime import datetime
from functools import wraps from functools import wraps
from typing import Callable, TypeVar from typing import Callable, TypeVar
from urllib.parse import quote, unquote from urllib.parse import quote, unquote
@ -21,6 +22,17 @@ def verify_password(raw: str, digest: str) -> bool:
return check_password_hash(digest, raw) return check_password_hash(digest, raw)
MIN_PASSWORD_LENGTH = 8
def validate_new_password(raw: str) -> str | None:
if raw == "":
return "password must not be empty"
if len(raw) < MIN_PASSWORD_LENGTH:
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
return None
def load_current_user() -> None: def load_current_user() -> None:
user_id = session.get("user_id") user_id = session.get("user_id")
if user_id is None: if user_id is None:
@ -28,17 +40,38 @@ def load_current_user() -> None:
return return
with session_scope() as db: with session_scope() as db:
user = db.scalar(select(User).where(User.id == int(user_id))) user = db.scalar(select(User).where(User.id == int(user_id)))
# Treat deactivated users as logged-out so existing sessions stop if user is None or not user.active:
# working as soon as an admin flips active=False. g.user = None
g.user = user if user is not None and user.active else None return
marker = session.get("pw_changed_at")
if marker is None:
g.user = None
return
try:
marker_dt = datetime.fromisoformat(marker)
except ValueError:
g.user = None
return
# user.password_changed_at comes back naive from SQLite; strip tz from the
# marker so an aware-marker session (just stamped from an in-memory user)
# compares cleanly with a freshly-loaded user row.
if marker_dt.tzinfo is not None:
marker_dt = marker_dt.replace(tzinfo=None)
if marker_dt < user.password_changed_at:
g.user = None
return
g.user = user
def current_user() -> User | None: def current_user() -> User | None:
return getattr(g, "user", None) return getattr(g, "user", None)
def login_user(user_id: int) -> None: def login_user(user_id: int, password_changed_at) -> None:
session["user_id"] = user_id session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
def logout_user() -> None: def logout_user() -> None:

View file

@ -5,7 +5,7 @@ import click
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy import select from sqlalchemy import select
from l4d2web.auth import hash_password from l4d2web.auth import hash_password, validate_new_password
from l4d2web.db import session_scope from l4d2web.db import session_scope
from l4d2web.models import Overlay, User from l4d2web.models import Overlay, User
from l4d2web.services.overlay_creation import ( from l4d2web.services.overlay_creation import (
@ -31,8 +31,9 @@ def create_user(username: str, admin: bool) -> None:
password = os.getenv("LEFT4ME_ADMIN_PASSWORD") password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
if password is None: if password is None:
password = click.prompt("Password", hide_input=True, confirmation_prompt=True) password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
if password == "": policy_error = validate_new_password(password)
raise click.ClickException("password must not be empty") if policy_error is not None:
raise click.ClickException(policy_error)
try: try:
with session_scope() as db: with session_scope() as db:

View file

@ -35,6 +35,7 @@ class User(Base):
) )
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False) updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class Overlay(Base): class Overlay(Base):

View file

@ -1,16 +1,15 @@
import time
from flask import Blueprint, Response, redirect, render_template, request from flask import Blueprint, Response, redirect, render_template, request
from sqlalchemy import select from sqlalchemy import select
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
from l4d2web.db import session_scope from l4d2web.db import session_scope
from l4d2web.models import User from l4d2web.models import User
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("auth", __name__) bp = Blueprint("auth", __name__)
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__") _TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60 LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20 LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {} LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
@ -20,14 +19,12 @@ def reset_login_rate_limits() -> None:
def is_login_rate_limited(remote_addr: str) -> bool: def is_login_rate_limited(remote_addr: str) -> bool:
now = time.time() return check_rate_limit(
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, []) LOGIN_ATTEMPTS_BY_IP,
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS remote_addr,
attempts[:] = [ts for ts in attempts if ts >= cutoff] window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS: max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
return True )
attempts.append(now)
return False
@bp.get("/login") @bp.get("/login")
@ -52,7 +49,7 @@ def login() -> Response:
# Same generic response for missing user, wrong password, or # Same generic response for missing user, wrong password, or
# deactivated account — no timing oracle for deactivation status. # deactivated account — no timing oracle for deactivation status.
return Response("invalid credentials", status=401) return Response("invalid credentials", status=401)
login_user(user.id) login_user(user.id, user.password_changed_at)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None) LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
next_target = request.form.get("next", "") next_target = request.form.get("next", "")
return redirect(next_target if is_safe_next(next_target) else "/dashboard") return redirect(next_target if is_safe_next(next_target) else "/dashboard")

View file

@ -0,0 +1,93 @@
from flask import Blueprint, Response, redirect, render_template, request, session
from sqlalchemy import select
from l4d2web.auth import (
current_user,
hash_password,
require_login,
validate_new_password,
verify_password,
)
from l4d2web.db import session_scope
from l4d2web.models import User, now_utc
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("profile", __name__)
_ERROR_MESSAGES = {
"fields_required": "Fill in all three fields.",
"mismatch": "New password and confirmation do not match.",
"too_short": "New password must be at least 8 characters.",
"empty": "New password must not be empty.",
"wrong_current": "Current password is incorrect.",
}
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
def reset_profile_password_rate_limits() -> None:
PROFILE_PW_ATTEMPTS_BY_IP.clear()
def _redirect_with_error(error_key: str) -> Response:
return redirect(f"/profile?error={error_key}")
@bp.get("/profile")
@require_login
def profile_page() -> str:
error_key = request.args.get("error", "")
success = request.args.get("success") == "1"
return render_template(
"profile.html",
error_message=_ERROR_MESSAGES.get(error_key, ""),
success=success,
)
@bp.post("/profile/password")
@require_login
def profile_password_change() -> Response:
remote_addr = request.remote_addr or "unknown"
if check_rate_limit(
PROFILE_PW_ATTEMPTS_BY_IP,
remote_addr,
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
):
return Response("too many attempts", status=429)
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_new_password = request.form.get("confirm_new_password", "")
if not current_password or not new_password or not confirm_new_password:
return _redirect_with_error("fields_required")
if new_password != confirm_new_password:
return _redirect_with_error("mismatch")
policy_error = validate_new_password(new_password)
if policy_error is not None:
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
return _redirect_with_error(error_key)
actor = current_user()
assert actor is not None # @require_login
with session_scope() as db:
user = db.scalar(select(User).where(User.id == actor.id))
if user is None or not verify_password(current_password, user.password_digest):
return _redirect_with_error("wrong_current")
user.password_digest = hash_password(new_password)
# Strip tz so the marker matches what a subsequent DB read returns
# (SQLite DateTime columns don't preserve tzinfo).
user.password_changed_at = now_utc().replace(tzinfo=None)
new_marker = user.password_changed_at.isoformat()
session["pw_changed_at"] = new_marker
return redirect("/profile?success=1")

View file

@ -0,0 +1,23 @@
import time
def check_rate_limit(
bucket: dict[str, list[float]],
remote_addr: str,
*,
window: float,
max_attempts: int,
) -> bool:
"""Return True if this request should be rate-limited (rejected).
The bucket is owned by the caller so each endpoint can keep an
independent count and tests can reset state cleanly.
"""
now = time.time()
attempts = bucket.setdefault(remote_addr, [])
cutoff = now - window
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= max_attempts:
return True
attempts.append(now)
return False

View file

@ -24,7 +24,7 @@
{% if g.user %} {% if g.user %}
<nav class="account-nav" aria-label="Account navigation"> <nav class="account-nav" aria-label="Account navigation">
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %} {% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
<span class="muted">{{ g.user.username }}</span> <a class="muted" href="/profile">{{ g.user.username }}</a>
<form method="post" action="/logout" class="inline-form"> <form method="post" action="/logout" class="inline-form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}"> <input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<button class="link-button" type="submit">logout</button> <button class="link-button" type="submit">logout</button>

View file

@ -0,0 +1,33 @@
{% extends "base.html" %}
{% block title %}Profile | left4me{% endblock %}
{% block content %}
<section class="panel">
<h1>Change password</h1>
{% if success %}
<p class="muted">Password changed.</p>
{% endif %}
{% if error_message %}
<p class="error">{{ error_message }}</p>
{% endif %}
<form method="post" action="/profile/password" class="form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<label>
Current password
<input type="password" name="current_password" autocomplete="current-password" required>
</label>
<label>
New password
<input type="password" name="new_password" autocomplete="new-password" required>
</label>
<label>
Confirm new password
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
</label>
<button type="submit">Change password</button>
</form>
</section>
{% endblock %}

View file

@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch):
db.add_all([admin, second_admin]) db.add_all([admin, second_admin])
db.flush() db.flush()
admin_id = admin.id admin_id = admin.id
admin_marker = admin.password_changed_at.isoformat()
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = admin_marker
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client, admin_id return client, admin_id
@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
"""An active session at the moment of deactivation stops working.""" """An active session at the moment of deactivation stops working."""
client, _ = admin_client client, _ = admin_client
target = _add_user("bob") target = _add_user("bob")
with session_scope() as db:
bob = db.scalar(select(User).where(User.id == target))
bob_marker = bob.password_changed_at.isoformat()
# Forge a session for bob. # Forge a session for bob.
bob_client = client.application.test_client() bob_client = client.application.test_client()
with bob_client.session_transaction() as sess: with bob_client.session_transaction() as sess:
sess["user_id"] = target sess["user_id"] = target
sess["pw_changed_at"] = bob_marker
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
# Sanity: bob can hit a logged-in route. # Sanity: bob can hit a logged-in route.

View file

@ -134,10 +134,75 @@ def test_login_sets_session(client) -> None:
assert sess.get("user_id") is not None assert sess.get("user_id") is not None
def test_load_current_user_rejects_missing_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
with client.session_transaction() as sess:
sess["user_id"] = uid
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_rejects_stale_marker(client) -> None:
from datetime import UTC, datetime, timedelta
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = stale.isoformat()
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_accepts_current_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
response = client.get("/dashboard")
assert response.status_code == 200
def test_login_stamps_password_changed_at_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
marker = sess.get("pw_changed_at")
assert marker is not None
with session_scope() as session:
user = session.query(User).filter_by(username="alice").one()
assert marker == user.password_changed_at.isoformat()
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None: def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'create_user.db'}" db_url = f"sqlite:///{tmp_path/'create_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url) monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret") monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db() init_db()
@ -150,6 +215,19 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
assert user.admin is True assert user.admin is True
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
assert result.exit_code != 0
assert "at least 8" in result.output
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None: def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'empty_password.db'}" db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
monkeypatch.setenv("DATABASE_URL", db_url) monkeypatch.setenv("DATABASE_URL", db_url)
@ -163,10 +241,26 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
assert "password must not be empty" in result.output assert "password must not be empty" in result.output
def test_validate_new_password_rejects_empty():
from l4d2web.auth import validate_new_password
assert validate_new_password("") == "password must not be empty"
def test_validate_new_password_rejects_short():
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
assert MIN_PASSWORD_LENGTH == 8
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
def test_validate_new_password_accepts_min_length():
from l4d2web.auth import validate_new_password
assert validate_new_password("a" * 8) is None
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None: def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}" db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url) monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret") monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db() init_db()
with session_scope() as session: with session_scope() as session:

View file

@ -1,4 +1,5 @@
import json import json
from datetime import UTC, datetime
import pytest import pytest
from sqlalchemy import select from sqlalchemy import select
@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client return client
@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client, blueprint_id return client, blueprint_id

View file

@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
from datetime import UTC, datetime
from sqlalchemy import text from sqlalchemy import text
@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5") response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200 assert response.status_code == 200
@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5") response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True) text = response.get_data(as_text=True)
@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"}) response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True) text = response.get_data(as_text=True)

View file

@ -18,3 +18,23 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
assert user.id is not None assert user.id is not None
assert blueprint.id is not None assert blueprint.id is not None
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
db_url = f"sqlite:///{tmp_path/'pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
before = datetime.now(UTC).replace(tzinfo=None)
with session_scope() as db:
db.add(User(username="alice", password_digest=hash_password("secret")))
with session_scope() as db:
user = db.query(User).filter_by(username="alice").one()
assert user.password_changed_at is not None
assert user.password_changed_at >= before

View file

@ -1,5 +1,6 @@
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes.""" """HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
from __future__ import annotations from __future__ import annotations
from datetime import UTC, datetime
import io import io
import os import os
@ -34,6 +35,7 @@ def _client_for(app, user_id: int):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client return client

View file

@ -4,6 +4,7 @@ from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
from l4d2web.services.security import validate_overlay_ref from l4d2web.services.security import validate_overlay_ref
from datetime import UTC, datetime
@pytest.fixture @pytest.fixture
@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client return client
@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client return client
@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
c = app.test_client() c = app.test_client()
with c.session_transaction() as sess: with c.session_transaction() as sess:
sess["user_id"] = uid sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return c return c

View file

@ -1,5 +1,6 @@
import pytest import pytest
from pathlib import Path from pathlib import Path
from datetime import UTC, datetime
from l4d2web.app import create_app from l4d2web.app import create_app
from l4d2web.auth import hash_password from l4d2web.auth import hash_password
@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client return client
@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = owner_id sess["user_id"] = owner_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, blueprint_id return client, blueprint_id
@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
admin_page = client.get("/admin") admin_page = client.get("/admin")
assert admin_page.status_code == 200 assert admin_page.status_code == 200
@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}") response = client.get(f"/jobs/{job_id}")
@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"}) response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
logged_in_response = client.get("/") logged_in_response = client.get("/")
assert logged_in_response.status_code == 302 assert logged_in_response.status_code == 302
@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
admin_client = app.test_client() admin_client = app.test_client()
with admin_client.session_transaction() as sess: with admin_client.session_transaction() as sess:
sess["user_id"] = admin_id sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db: with session_scope() as db:
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")) db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
user_client = app.test_client() user_client = app.test_client()
with user_client.session_transaction() as sess: with user_client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db: with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued") job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = other_id sess["user_id"] = other_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/overlays/{overlay_id}/jobs") response = client.get(f"/overlays/{overlay_id}/jobs")
assert response.status_code == 403 assert response.status_code == 403

View file

@ -0,0 +1,258 @@
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import User
@pytest.fixture
def app_and_user(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'profile.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("currentpass"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
return app, uid, marker
def _logged_in_client(app, uid, marker):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
sess["csrf_token"] = "test-token"
return client
def test_profile_requires_login(app_and_user):
app, _, _ = app_and_user
response = app.test_client().get("/profile", follow_redirects=False)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_profile_page_renders(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/profile")
assert response.status_code == 200
body = response.get_data(as_text=True)
assert "Change password" in body
assert 'name="current_password"' in body
assert 'name="new_password"' in body
assert 'name="confirm_new_password"' in body
def test_base_template_links_username_to_profile(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/dashboard")
body = response.get_data(as_text=True)
assert 'href="/profile"' in body
assert ">alice<" in body
def _post_pw(client, **fields):
payload = {"csrf_token": "test-token", **fields}
return client.post("/profile/password", data=payload, follow_redirects=False)
def _digest_of(username):
with session_scope() as db:
u = db.scalar(select(User).where(User.username == username))
return u.password_digest
def test_post_password_rejects_missing_csrf(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = client.post(
"/profile/password",
data={
"current_password": "currentpass",
"new_password": "newpass12",
"confirm_new_password": "newpass12",
},
)
assert response.status_code == 400
assert _digest_of("alice") == before
def test_post_password_requires_login(app_and_user):
app, _, _ = app_and_user
client = app.test_client()
with client.session_transaction() as sess:
sess["csrf_token"] = "test-token"
response = client.post(
"/profile/password",
data={
"csrf_token": "test-token",
"current_password": "x",
"new_password": "y" * 8,
"confirm_new_password": "y" * 8,
},
follow_redirects=False,
)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_post_password_empty_fields_redirects_with_error(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
assert response.status_code == 302
assert "/profile?error=fields_required" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_mismatched_confirm(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass99",
)
assert response.status_code == 302
assert "/profile?error=mismatch" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_too_short(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="short7x",
confirm_new_password="short7x",
)
assert response.status_code == 302
assert "/profile?error=too_short" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_wrong_current(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert "/profile?error=wrong_current" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before_digest = _digest_of("alice")
with session_scope() as db:
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/profile?success=1")
after_digest = _digest_of("alice")
assert after_digest != before_digest
with session_scope() as db:
u = db.scalar(select(User).where(User.id == uid))
assert u.password_changed_at > before_marker
new_marker = u.password_changed_at.isoformat()
follow = client.get("/dashboard")
assert follow.status_code == 200
with client.session_transaction() as sess:
assert sess["pw_changed_at"] == new_marker
def test_other_session_dies_after_password_change(app_and_user):
app, uid, marker = app_and_user
primary = _logged_in_client(app, uid, marker)
other = _logged_in_client(app, uid, marker)
pre = other.get("/dashboard")
assert pre.status_code == 200
response = _post_pw(
primary,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
post = other.get("/dashboard", follow_redirects=False)
assert post.status_code == 302
assert "/login" in post.headers["Location"]
def test_new_password_works_for_login(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
_post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
fresh = app.test_client()
response = fresh.post(
"/login",
data={"username": "alice", "password": "newpass12"},
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/dashboard")
def test_post_password_rate_limited(app_and_user):
from l4d2web.routes.profile_routes import (
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
reset_profile_password_rate_limits,
)
reset_profile_password_rate_limits()
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
_post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
blocked = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert blocked.status_code == 429

View file

@ -0,0 +1,31 @@
import time
from l4d2web.services.rate_limit import check_rate_limit
def test_under_threshold_allows():
bucket: dict[str, list[float]] = {}
for _ in range(3):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
def test_at_threshold_blocks():
bucket: dict[str, list[float]] = {}
for _ in range(5):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
def test_other_ips_independent():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
def test_old_attempts_expire():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
time.sleep(0.1)
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False

View file

@ -1,6 +1,7 @@
"""Routes for type='script' overlays: create, /script (update body), """Routes for type='script' overlays: create, /script (update body),
/wipe, /build. Permissions mirror workshop overlays (owner or admin).""" /wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
from __future__ import annotations from __future__ import annotations
from datetime import UTC, datetime
import pytest import pytest
from sqlalchemy import select from sqlalchemy import select
@ -52,6 +53,7 @@ def _client_for(app, user_id: int):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client return client

View file

@ -1,4 +1,5 @@
import json import json
from datetime import UTC, datetime
import pytest import pytest
@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = payload["user_id"] sess["user_id"] = payload["user_id"]
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return client, payload return client, payload
@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
response = client.get("/servers") response = client.get("/servers")
@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = alice_id sess["user_id"] = alice_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
alice_resp = client.post( alice_resp = client.post(
"/servers", "/servers",
@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = bob_id sess["user_id"] = bob_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
bob_resp = client.post( bob_resp = client.post(
"/servers", "/servers",
@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
first = client.post( first = client.post(

View file

@ -1,4 +1,5 @@
import pytest import pytest
from datetime import UTC, datetime
from l4d2web.app import create_app from l4d2web.app import create_app
from l4d2web.auth import hash_password from l4d2web.auth import hash_password
@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch):
client = app.test_client() client = app.test_client()
with client.session_transaction() as sess: with client.session_transaction() as sess:
sess["user_id"] = user_id sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, server_id return client, server_id

View file

@ -1,6 +1,7 @@
"""Tests for the workshop overlay routes (add items, remove items, build, """Tests for the workshop overlay routes (add items, remove items, build,
admin refresh).""" admin refresh)."""
from __future__ import annotations from __future__ import annotations
from datetime import UTC, datetime
from typing import Iterable from typing import Iterable
from unittest.mock import patch from unittest.mock import patch
@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch):
c = app.test_client() c = app.test_client()
with c.session_transaction() as sess: with c.session_transaction() as sess:
sess["user_id"] = uid sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token" sess["csrf_token"] = "test-token"
return c return c