Compare commits

..

No commits in common. "cb52a69faf23355f310eaf319e750b7b522997b4" and "e1149704c8e280baef83b04aaaa85bc410ecaeaf" have entirely different histories.

25 changed files with 22 additions and 1943 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,32 +0,0 @@
"""users.password_changed_at
Revision ID: 0009_user_password_changed_at
Revises: 0008_user_active
Create Date: 2026-05-11
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = "0009_user_password_changed_at"
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
op.execute(
"UPDATE users SET password_changed_at = created_at "
"WHERE password_changed_at IS NULL"
)
with op.batch_alter_table("users") as batch_op:
batch_op.alter_column("password_changed_at", nullable=False)
def downgrade() -> None:
with op.batch_alter_table("users") as batch_op:
batch_op.drop_column("password_changed_at")

View file

@ -16,8 +16,6 @@ from l4d2web.routes.job_routes import bp as job_bp
from l4d2web.routes.log_routes import bp as log_bp
from l4d2web.routes.overlay_routes import bp as overlay_bp
from l4d2web.routes.page_routes import bp as page_bp
from l4d2web.routes.profile_routes import bp as profile_bp
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
from l4d2web.routes.server_routes import bp as server_bp
from l4d2web.routes.workshop_routes import bp as workshop_bp
from l4d2web.services.job_worker import (
@ -84,11 +82,9 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
app.register_blueprint(job_bp)
app.register_blueprint(log_bp)
app.register_blueprint(page_bp)
app.register_blueprint(profile_bp)
register_cli(app)
if app.config.get("TESTING"):
reset_login_rate_limits()
reset_profile_password_rate_limits()
should_start_workers = (
app.config.get("JOB_WORKER_ENABLED")
and not app.config.get("TESTING")

View file

@ -1,4 +1,3 @@
from datetime import datetime
from functools import wraps
from typing import Callable, TypeVar
from urllib.parse import quote, unquote
@ -22,17 +21,6 @@ def verify_password(raw: str, digest: str) -> bool:
return check_password_hash(digest, raw)
MIN_PASSWORD_LENGTH = 8
def validate_new_password(raw: str) -> str | None:
if raw == "":
return "password must not be empty"
if len(raw) < MIN_PASSWORD_LENGTH:
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
return None
def load_current_user() -> None:
user_id = session.get("user_id")
if user_id is None:
@ -40,38 +28,17 @@ def load_current_user() -> None:
return
with session_scope() as db:
user = db.scalar(select(User).where(User.id == int(user_id)))
if user is None or not user.active:
g.user = None
return
marker = session.get("pw_changed_at")
if marker is None:
g.user = None
return
try:
marker_dt = datetime.fromisoformat(marker)
except ValueError:
g.user = None
return
# user.password_changed_at comes back naive from SQLite; strip tz from the
# marker so an aware-marker session (just stamped from an in-memory user)
# compares cleanly with a freshly-loaded user row.
if marker_dt.tzinfo is not None:
marker_dt = marker_dt.replace(tzinfo=None)
if marker_dt < user.password_changed_at:
g.user = None
return
g.user = user
# Treat deactivated users as logged-out so existing sessions stop
# working as soon as an admin flips active=False.
g.user = user if user is not None and user.active else None
def current_user() -> User | None:
return getattr(g, "user", None)
def login_user(user_id: int, password_changed_at) -> None:
def login_user(user_id: int) -> None:
session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
def logout_user() -> None:

View file

@ -5,7 +5,7 @@ import click
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select
from l4d2web.auth import hash_password, validate_new_password
from l4d2web.auth import hash_password
from l4d2web.db import session_scope
from l4d2web.models import Overlay, User
from l4d2web.services.overlay_creation import (
@ -31,9 +31,8 @@ def create_user(username: str, admin: bool) -> None:
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
if password is None:
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
policy_error = validate_new_password(password)
if policy_error is not None:
raise click.ClickException(policy_error)
if password == "":
raise click.ClickException("password must not be empty")
try:
with session_scope() as db:

View file

@ -35,7 +35,6 @@ class User(Base):
)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class Overlay(Base):

View file

@ -1,15 +1,16 @@
import time
from flask import Blueprint, Response, redirect, render_template, request
from sqlalchemy import select
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
from l4d2web.db import session_scope
from l4d2web.models import User
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("auth", __name__)
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
@ -19,12 +20,14 @@ def reset_login_rate_limits() -> None:
def is_login_rate_limited(remote_addr: str) -> bool:
return check_rate_limit(
LOGIN_ATTEMPTS_BY_IP,
remote_addr,
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
)
now = time.time()
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
return True
attempts.append(now)
return False
@bp.get("/login")
@ -49,7 +52,7 @@ def login() -> Response:
# Same generic response for missing user, wrong password, or
# deactivated account — no timing oracle for deactivation status.
return Response("invalid credentials", status=401)
login_user(user.id, user.password_changed_at)
login_user(user.id)
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
next_target = request.form.get("next", "")
return redirect(next_target if is_safe_next(next_target) else "/dashboard")

View file

@ -1,93 +0,0 @@
from flask import Blueprint, Response, redirect, render_template, request, session
from sqlalchemy import select
from l4d2web.auth import (
current_user,
hash_password,
require_login,
validate_new_password,
verify_password,
)
from l4d2web.db import session_scope
from l4d2web.models import User, now_utc
from l4d2web.services.rate_limit import check_rate_limit
bp = Blueprint("profile", __name__)
_ERROR_MESSAGES = {
"fields_required": "Fill in all three fields.",
"mismatch": "New password and confirmation do not match.",
"too_short": "New password must be at least 8 characters.",
"empty": "New password must not be empty.",
"wrong_current": "Current password is incorrect.",
}
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
def reset_profile_password_rate_limits() -> None:
PROFILE_PW_ATTEMPTS_BY_IP.clear()
def _redirect_with_error(error_key: str) -> Response:
return redirect(f"/profile?error={error_key}")
@bp.get("/profile")
@require_login
def profile_page() -> str:
error_key = request.args.get("error", "")
success = request.args.get("success") == "1"
return render_template(
"profile.html",
error_message=_ERROR_MESSAGES.get(error_key, ""),
success=success,
)
@bp.post("/profile/password")
@require_login
def profile_password_change() -> Response:
remote_addr = request.remote_addr or "unknown"
if check_rate_limit(
PROFILE_PW_ATTEMPTS_BY_IP,
remote_addr,
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
):
return Response("too many attempts", status=429)
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_new_password = request.form.get("confirm_new_password", "")
if not current_password or not new_password or not confirm_new_password:
return _redirect_with_error("fields_required")
if new_password != confirm_new_password:
return _redirect_with_error("mismatch")
policy_error = validate_new_password(new_password)
if policy_error is not None:
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
return _redirect_with_error(error_key)
actor = current_user()
assert actor is not None # @require_login
with session_scope() as db:
user = db.scalar(select(User).where(User.id == actor.id))
if user is None or not verify_password(current_password, user.password_digest):
return _redirect_with_error("wrong_current")
user.password_digest = hash_password(new_password)
# Strip tz so the marker matches what a subsequent DB read returns
# (SQLite DateTime columns don't preserve tzinfo).
user.password_changed_at = now_utc().replace(tzinfo=None)
new_marker = user.password_changed_at.isoformat()
session["pw_changed_at"] = new_marker
return redirect("/profile?success=1")

View file

@ -1,23 +0,0 @@
import time
def check_rate_limit(
bucket: dict[str, list[float]],
remote_addr: str,
*,
window: float,
max_attempts: int,
) -> bool:
"""Return True if this request should be rate-limited (rejected).
The bucket is owned by the caller so each endpoint can keep an
independent count and tests can reset state cleanly.
"""
now = time.time()
attempts = bucket.setdefault(remote_addr, [])
cutoff = now - window
attempts[:] = [ts for ts in attempts if ts >= cutoff]
if len(attempts) >= max_attempts:
return True
attempts.append(now)
return False

View file

@ -24,7 +24,7 @@
{% if g.user %}
<nav class="account-nav" aria-label="Account navigation">
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
<a class="muted" href="/profile">{{ g.user.username }}</a>
<span class="muted">{{ g.user.username }}</span>
<form method="post" action="/logout" class="inline-form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<button class="link-button" type="submit">logout</button>

View file

@ -1,33 +0,0 @@
{% extends "base.html" %}
{% block title %}Profile | left4me{% endblock %}
{% block content %}
<section class="panel">
<h1>Change password</h1>
{% if success %}
<p class="muted">Password changed.</p>
{% endif %}
{% if error_message %}
<p class="error">{{ error_message }}</p>
{% endif %}
<form method="post" action="/profile/password" class="form">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<label>
Current password
<input type="password" name="current_password" autocomplete="current-password" required>
</label>
<label>
New password
<input type="password" name="new_password" autocomplete="new-password" required>
</label>
<label>
Confirm new password
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
</label>
<button type="submit">Change password</button>
</form>
</section>
{% endblock %}

View file

@ -25,12 +25,10 @@ def admin_client(tmp_path, monkeypatch):
db.add_all([admin, second_admin])
db.flush()
admin_id = admin.id
admin_marker = admin.password_changed_at.isoformat()
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = admin_marker
sess["csrf_token"] = "test-token"
return client, admin_id
@ -126,15 +124,11 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
"""An active session at the moment of deactivation stops working."""
client, _ = admin_client
target = _add_user("bob")
with session_scope() as db:
bob = db.scalar(select(User).where(User.id == target))
bob_marker = bob.password_changed_at.isoformat()
# Forge a session for bob.
bob_client = client.application.test_client()
with bob_client.session_transaction() as sess:
sess["user_id"] = target
sess["pw_changed_at"] = bob_marker
sess["csrf_token"] = "test-token"
# Sanity: bob can hit a logged-in route.

View file

@ -134,75 +134,10 @@ def test_login_sets_session(client) -> None:
assert sess.get("user_id") is not None
def test_load_current_user_rejects_missing_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
with client.session_transaction() as sess:
sess["user_id"] = uid
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_rejects_stale_marker(client) -> None:
from datetime import UTC, datetime, timedelta
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = stale.isoformat()
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_load_current_user_accepts_current_marker(client) -> None:
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("secret"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
response = client.get("/dashboard")
assert response.status_code == 200
def test_login_stamps_password_changed_at_in_session(client) -> None:
with session_scope() as session:
session.add(User(username="alice", password_digest=hash_password("secret")))
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
with client.session_transaction() as sess:
marker = sess.get("pw_changed_at")
assert marker is not None
with session_scope() as session:
user = session.query(User).filter_by(username="alice").one()
assert marker == user.password_changed_at.isoformat()
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
@ -215,19 +150,6 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
assert user.admin is True
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
assert result.exit_code != 0
assert "at least 8" in result.output
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
@ -241,26 +163,10 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
assert "password must not be empty" in result.output
def test_validate_new_password_rejects_empty():
from l4d2web.auth import validate_new_password
assert validate_new_password("") == "password must not be empty"
def test_validate_new_password_rejects_short():
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
assert MIN_PASSWORD_LENGTH == 8
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
def test_validate_new_password_accepts_min_length():
from l4d2web.auth import validate_new_password
assert validate_new_password("a" * 8) is None
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:

View file

@ -1,5 +1,4 @@
import json
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -32,7 +31,6 @@ def user_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -60,7 +58,6 @@ def linked_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, blueprint_id

View file

@ -1,5 +1,4 @@
from pathlib import Path
from datetime import UTC, datetime
from sqlalchemy import text
@ -67,7 +66,6 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
assert response.status_code == 200
@ -78,7 +76,6 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
text = response.get_data(as_text=True)
@ -94,7 +91,6 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
text = response.get_data(as_text=True)

View file

@ -18,23 +18,3 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
assert user.id is not None
assert blueprint.id is not None
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
db_url = f"sqlite:///{tmp_path/'pw.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
before = datetime.now(UTC).replace(tzinfo=None)
with session_scope() as db:
db.add(User(username="alice", password_digest=hash_password("secret")))
with session_scope() as db:
user = db.query(User).filter_by(username="alice").one()
assert user.password_changed_at is not None
assert user.password_changed_at >= before

View file

@ -1,6 +1,5 @@
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
from __future__ import annotations
from datetime import UTC, datetime
import io
import os
@ -35,7 +34,6 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -4,7 +4,6 @@ from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
from l4d2web.services.security import validate_overlay_ref
from datetime import UTC, datetime
@pytest.fixture
@ -24,7 +23,6 @@ def admin_client(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -50,7 +48,6 @@ def user_client_with_overlay(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client
@ -149,7 +146,6 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c

View file

@ -1,6 +1,5 @@
import pytest
from pathlib import Path
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -36,7 +35,6 @@ def auth_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client
@ -63,7 +61,6 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = owner_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, blueprint_id
@ -348,7 +345,6 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
admin_page = client.get("/admin")
assert admin_page.status_code == 200
@ -384,7 +380,6 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/jobs/{job_id}")
@ -407,7 +402,6 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
@ -463,7 +457,6 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
logged_in_response = client.get("/")
assert logged_in_response.status_code == 302
@ -521,7 +514,6 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
admin_client = app.test_client()
with admin_client.session_transaction() as sess:
sess["user_id"] = admin_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
@ -549,7 +541,6 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
user_client = app.test_client()
with user_client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
@ -726,7 +717,6 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = other_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
response = client.get(f"/overlays/{overlay_id}/jobs")
assert response.status_code == 403

View file

@ -1,258 +0,0 @@
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import User
@pytest.fixture
def app_and_user(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'profile.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as db:
u = User(username="alice", password_digest=hash_password("currentpass"))
db.add(u)
db.flush()
uid = u.id
marker = u.password_changed_at.isoformat()
return app, uid, marker
def _logged_in_client(app, uid, marker):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = marker
sess["csrf_token"] = "test-token"
return client
def test_profile_requires_login(app_and_user):
app, _, _ = app_and_user
response = app.test_client().get("/profile", follow_redirects=False)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_profile_page_renders(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/profile")
assert response.status_code == 200
body = response.get_data(as_text=True)
assert "Change password" in body
assert 'name="current_password"' in body
assert 'name="new_password"' in body
assert 'name="confirm_new_password"' in body
def test_base_template_links_username_to_profile(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
response = client.get("/dashboard")
body = response.get_data(as_text=True)
assert 'href="/profile"' in body
assert ">alice<" in body
def _post_pw(client, **fields):
payload = {"csrf_token": "test-token", **fields}
return client.post("/profile/password", data=payload, follow_redirects=False)
def _digest_of(username):
with session_scope() as db:
u = db.scalar(select(User).where(User.username == username))
return u.password_digest
def test_post_password_rejects_missing_csrf(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = client.post(
"/profile/password",
data={
"current_password": "currentpass",
"new_password": "newpass12",
"confirm_new_password": "newpass12",
},
)
assert response.status_code == 400
assert _digest_of("alice") == before
def test_post_password_requires_login(app_and_user):
app, _, _ = app_and_user
client = app.test_client()
with client.session_transaction() as sess:
sess["csrf_token"] = "test-token"
response = client.post(
"/profile/password",
data={
"csrf_token": "test-token",
"current_password": "x",
"new_password": "y" * 8,
"confirm_new_password": "y" * 8,
},
follow_redirects=False,
)
assert response.status_code == 302
assert "/login" in response.headers["Location"]
def test_post_password_empty_fields_redirects_with_error(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
assert response.status_code == 302
assert "/profile?error=fields_required" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_mismatched_confirm(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass99",
)
assert response.status_code == 302
assert "/profile?error=mismatch" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_too_short(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="currentpass",
new_password="short7x",
confirm_new_password="short7x",
)
assert response.status_code == 302
assert "/profile?error=too_short" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_wrong_current(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before = _digest_of("alice")
response = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert "/profile?error=wrong_current" in response.headers["Location"]
assert _digest_of("alice") == before
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
before_digest = _digest_of("alice")
with session_scope() as db:
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
response = _post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/profile?success=1")
after_digest = _digest_of("alice")
assert after_digest != before_digest
with session_scope() as db:
u = db.scalar(select(User).where(User.id == uid))
assert u.password_changed_at > before_marker
new_marker = u.password_changed_at.isoformat()
follow = client.get("/dashboard")
assert follow.status_code == 200
with client.session_transaction() as sess:
assert sess["pw_changed_at"] == new_marker
def test_other_session_dies_after_password_change(app_and_user):
app, uid, marker = app_and_user
primary = _logged_in_client(app, uid, marker)
other = _logged_in_client(app, uid, marker)
pre = other.get("/dashboard")
assert pre.status_code == 200
response = _post_pw(
primary,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert response.status_code == 302
post = other.get("/dashboard", follow_redirects=False)
assert post.status_code == 302
assert "/login" in post.headers["Location"]
def test_new_password_works_for_login(app_and_user):
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
_post_pw(
client,
current_password="currentpass",
new_password="newpass12",
confirm_new_password="newpass12",
)
fresh = app.test_client()
response = fresh.post(
"/login",
data={"username": "alice", "password": "newpass12"},
)
assert response.status_code == 302
assert response.headers["Location"].endswith("/dashboard")
def test_post_password_rate_limited(app_and_user):
from l4d2web.routes.profile_routes import (
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
reset_profile_password_rate_limits,
)
reset_profile_password_rate_limits()
app, uid, marker = app_and_user
client = _logged_in_client(app, uid, marker)
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
_post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
blocked = _post_pw(
client,
current_password="WRONG",
new_password="newpass12",
confirm_new_password="newpass12",
)
assert blocked.status_code == 429

View file

@ -1,31 +0,0 @@
import time
from l4d2web.services.rate_limit import check_rate_limit
def test_under_threshold_allows():
bucket: dict[str, list[float]] = {}
for _ in range(3):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
def test_at_threshold_blocks():
bucket: dict[str, list[float]] = {}
for _ in range(5):
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
def test_other_ips_independent():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
def test_old_attempts_expire():
bucket: dict[str, list[float]] = {}
for _ in range(5):
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
time.sleep(0.1)
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False

View file

@ -1,7 +1,6 @@
"""Routes for type='script' overlays: create, /script (update body),
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from sqlalchemy import select
@ -53,7 +52,6 @@ def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client

View file

@ -1,5 +1,4 @@
import json
from datetime import UTC, datetime
import pytest
@ -33,7 +32,6 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = payload["user_id"]
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return client, payload
@ -54,7 +52,6 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
response = client.get("/servers")
@ -238,7 +235,6 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = alice_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
alice_resp = client.post(
"/servers",
@ -249,7 +245,6 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
with client.session_transaction() as sess:
sess["user_id"] = bob_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
bob_resp = client.post(
"/servers",
@ -323,7 +318,6 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
first = client.post(

View file

@ -1,5 +1,4 @@
import pytest
from datetime import UTC, datetime
from l4d2web.app import create_app
from l4d2web.auth import hash_password
@ -33,7 +32,6 @@ def owner_client_with_server(tmp_path, monkeypatch):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
return client, server_id

View file

@ -1,7 +1,6 @@
"""Tests for the workshop overlay routes (add items, remove items, build,
admin refresh)."""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Iterable
from unittest.mock import patch
@ -55,7 +54,6 @@ def env_user(tmp_path, monkeypatch):
c = app.test_client()
with c.session_transaction() as sess:
sess["user_id"] = uid
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
sess["csrf_token"] = "test-token"
return c