Compare commits
13 commits
e1149704c8
...
cb52a69faf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb52a69faf | ||
|
|
f643246a84 | ||
|
|
224b023ca0 | ||
|
|
47722dbb19 | ||
|
|
d25fb57f30 | ||
|
|
eef85f36a9 | ||
|
|
e75f379dcb | ||
|
|
84dc672180 | ||
|
|
26a6a9d7b0 | ||
|
|
a5982941df | ||
|
|
2353378b23 | ||
|
|
eb1f2b82eb | ||
|
|
6eb9bd0ab3 |
25 changed files with 1943 additions and 22 deletions
1260
docs/superpowers/plans/2026-05-11-profile-password-change-v1.md
Normal file
1260
docs/superpowers/plans/2026-05-11-profile-password-change-v1.md
Normal file
File diff suppressed because it is too large
Load diff
32
l4d2web/alembic/versions/0009_user_password_changed_at.py
Normal file
32
l4d2web/alembic/versions/0009_user_password_changed_at.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
"""users.password_changed_at
|
||||
|
||||
Revision ID: 0009_user_password_changed_at
|
||||
Revises: 0008_user_active
|
||||
Create Date: 2026-05-11
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
|
||||
revision: str = "0009_user_password_changed_at"
|
||||
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
|
||||
op.execute(
|
||||
"UPDATE users SET password_changed_at = created_at "
|
||||
"WHERE password_changed_at IS NULL"
|
||||
)
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.alter_column("password_changed_at", nullable=False)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.drop_column("password_changed_at")
|
||||
|
|
@ -16,6 +16,8 @@ from l4d2web.routes.job_routes import bp as job_bp
|
|||
from l4d2web.routes.log_routes import bp as log_bp
|
||||
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||
from l4d2web.routes.page_routes import bp as page_bp
|
||||
from l4d2web.routes.profile_routes import bp as profile_bp
|
||||
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
|
||||
from l4d2web.routes.server_routes import bp as server_bp
|
||||
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
||||
from l4d2web.services.job_worker import (
|
||||
|
|
@ -82,9 +84,11 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
|||
app.register_blueprint(job_bp)
|
||||
app.register_blueprint(log_bp)
|
||||
app.register_blueprint(page_bp)
|
||||
app.register_blueprint(profile_bp)
|
||||
register_cli(app)
|
||||
if app.config.get("TESTING"):
|
||||
reset_login_rate_limits()
|
||||
reset_profile_password_rate_limits()
|
||||
should_start_workers = (
|
||||
app.config.get("JOB_WORKER_ENABLED")
|
||||
and not app.config.get("TESTING")
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from typing import Callable, TypeVar
|
||||
from urllib.parse import quote, unquote
|
||||
|
|
@ -21,6 +22,17 @@ def verify_password(raw: str, digest: str) -> bool:
|
|||
return check_password_hash(digest, raw)
|
||||
|
||||
|
||||
MIN_PASSWORD_LENGTH = 8
|
||||
|
||||
|
||||
def validate_new_password(raw: str) -> str | None:
|
||||
if raw == "":
|
||||
return "password must not be empty"
|
||||
if len(raw) < MIN_PASSWORD_LENGTH:
|
||||
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
|
||||
return None
|
||||
|
||||
|
||||
def load_current_user() -> None:
|
||||
user_id = session.get("user_id")
|
||||
if user_id is None:
|
||||
|
|
@ -28,17 +40,38 @@ def load_current_user() -> None:
|
|||
return
|
||||
with session_scope() as db:
|
||||
user = db.scalar(select(User).where(User.id == int(user_id)))
|
||||
# Treat deactivated users as logged-out so existing sessions stop
|
||||
# working as soon as an admin flips active=False.
|
||||
g.user = user if user is not None and user.active else None
|
||||
if user is None or not user.active:
|
||||
g.user = None
|
||||
return
|
||||
|
||||
marker = session.get("pw_changed_at")
|
||||
if marker is None:
|
||||
g.user = None
|
||||
return
|
||||
try:
|
||||
marker_dt = datetime.fromisoformat(marker)
|
||||
except ValueError:
|
||||
g.user = None
|
||||
return
|
||||
# user.password_changed_at comes back naive from SQLite; strip tz from the
|
||||
# marker so an aware-marker session (just stamped from an in-memory user)
|
||||
# compares cleanly with a freshly-loaded user row.
|
||||
if marker_dt.tzinfo is not None:
|
||||
marker_dt = marker_dt.replace(tzinfo=None)
|
||||
if marker_dt < user.password_changed_at:
|
||||
g.user = None
|
||||
return
|
||||
|
||||
g.user = user
|
||||
|
||||
|
||||
def current_user() -> User | None:
|
||||
return getattr(g, "user", None)
|
||||
|
||||
|
||||
def login_user(user_id: int) -> None:
|
||||
def login_user(user_id: int, password_changed_at) -> None:
|
||||
session["user_id"] = user_id
|
||||
session["pw_changed_at"] = password_changed_at.isoformat()
|
||||
|
||||
|
||||
def logout_user() -> None:
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import click
|
|||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.auth import hash_password, validate_new_password
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import Overlay, User
|
||||
from l4d2web.services.overlay_creation import (
|
||||
|
|
@ -31,8 +31,9 @@ def create_user(username: str, admin: bool) -> None:
|
|||
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
|
||||
if password is None:
|
||||
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
|
||||
if password == "":
|
||||
raise click.ClickException("password must not be empty")
|
||||
policy_error = validate_new_password(password)
|
||||
if policy_error is not None:
|
||||
raise click.ClickException(policy_error)
|
||||
|
||||
try:
|
||||
with session_scope() as db:
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class User(Base):
|
|||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
|
||||
|
||||
class Overlay(Base):
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
import time
|
||||
|
||||
from flask import Blueprint, Response, redirect, render_template, request
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import User
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
bp = Blueprint("auth", __name__)
|
||||
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
|
||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
||||
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||
|
||||
|
|
@ -20,14 +19,12 @@ def reset_login_rate_limits() -> None:
|
|||
|
||||
|
||||
def is_login_rate_limited(remote_addr: str) -> bool:
|
||||
now = time.time()
|
||||
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
|
||||
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
|
||||
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
||||
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
|
||||
return True
|
||||
attempts.append(now)
|
||||
return False
|
||||
return check_rate_limit(
|
||||
LOGIN_ATTEMPTS_BY_IP,
|
||||
remote_addr,
|
||||
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
||||
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
)
|
||||
|
||||
|
||||
@bp.get("/login")
|
||||
|
|
@ -52,7 +49,7 @@ def login() -> Response:
|
|||
# Same generic response for missing user, wrong password, or
|
||||
# deactivated account — no timing oracle for deactivation status.
|
||||
return Response("invalid credentials", status=401)
|
||||
login_user(user.id)
|
||||
login_user(user.id, user.password_changed_at)
|
||||
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
||||
next_target = request.form.get("next", "")
|
||||
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
||||
|
|
|
|||
93
l4d2web/routes/profile_routes.py
Normal file
93
l4d2web/routes/profile_routes.py
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
from flask import Blueprint, Response, redirect, render_template, request, session
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import (
|
||||
current_user,
|
||||
hash_password,
|
||||
require_login,
|
||||
validate_new_password,
|
||||
verify_password,
|
||||
)
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import User, now_utc
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
bp = Blueprint("profile", __name__)
|
||||
|
||||
|
||||
_ERROR_MESSAGES = {
|
||||
"fields_required": "Fill in all three fields.",
|
||||
"mismatch": "New password and confirmation do not match.",
|
||||
"too_short": "New password must be at least 8 characters.",
|
||||
"empty": "New password must not be empty.",
|
||||
"wrong_current": "Current password is incorrect.",
|
||||
}
|
||||
|
||||
|
||||
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
|
||||
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||
|
||||
|
||||
def reset_profile_password_rate_limits() -> None:
|
||||
PROFILE_PW_ATTEMPTS_BY_IP.clear()
|
||||
|
||||
|
||||
def _redirect_with_error(error_key: str) -> Response:
|
||||
return redirect(f"/profile?error={error_key}")
|
||||
|
||||
|
||||
@bp.get("/profile")
|
||||
@require_login
|
||||
def profile_page() -> str:
|
||||
error_key = request.args.get("error", "")
|
||||
success = request.args.get("success") == "1"
|
||||
return render_template(
|
||||
"profile.html",
|
||||
error_message=_ERROR_MESSAGES.get(error_key, ""),
|
||||
success=success,
|
||||
)
|
||||
|
||||
|
||||
@bp.post("/profile/password")
|
||||
@require_login
|
||||
def profile_password_change() -> Response:
|
||||
remote_addr = request.remote_addr or "unknown"
|
||||
if check_rate_limit(
|
||||
PROFILE_PW_ATTEMPTS_BY_IP,
|
||||
remote_addr,
|
||||
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
|
||||
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
):
|
||||
return Response("too many attempts", status=429)
|
||||
|
||||
current_password = request.form.get("current_password", "")
|
||||
new_password = request.form.get("new_password", "")
|
||||
confirm_new_password = request.form.get("confirm_new_password", "")
|
||||
|
||||
if not current_password or not new_password or not confirm_new_password:
|
||||
return _redirect_with_error("fields_required")
|
||||
if new_password != confirm_new_password:
|
||||
return _redirect_with_error("mismatch")
|
||||
policy_error = validate_new_password(new_password)
|
||||
if policy_error is not None:
|
||||
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
|
||||
return _redirect_with_error(error_key)
|
||||
|
||||
actor = current_user()
|
||||
assert actor is not None # @require_login
|
||||
|
||||
with session_scope() as db:
|
||||
user = db.scalar(select(User).where(User.id == actor.id))
|
||||
if user is None or not verify_password(current_password, user.password_digest):
|
||||
return _redirect_with_error("wrong_current")
|
||||
user.password_digest = hash_password(new_password)
|
||||
# Strip tz so the marker matches what a subsequent DB read returns
|
||||
# (SQLite DateTime columns don't preserve tzinfo).
|
||||
user.password_changed_at = now_utc().replace(tzinfo=None)
|
||||
new_marker = user.password_changed_at.isoformat()
|
||||
|
||||
session["pw_changed_at"] = new_marker
|
||||
|
||||
return redirect("/profile?success=1")
|
||||
23
l4d2web/services/rate_limit.py
Normal file
23
l4d2web/services/rate_limit.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
import time
|
||||
|
||||
|
||||
def check_rate_limit(
|
||||
bucket: dict[str, list[float]],
|
||||
remote_addr: str,
|
||||
*,
|
||||
window: float,
|
||||
max_attempts: int,
|
||||
) -> bool:
|
||||
"""Return True if this request should be rate-limited (rejected).
|
||||
|
||||
The bucket is owned by the caller so each endpoint can keep an
|
||||
independent count and tests can reset state cleanly.
|
||||
"""
|
||||
now = time.time()
|
||||
attempts = bucket.setdefault(remote_addr, [])
|
||||
cutoff = now - window
|
||||
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
||||
if len(attempts) >= max_attempts:
|
||||
return True
|
||||
attempts.append(now)
|
||||
return False
|
||||
|
|
@ -24,7 +24,7 @@
|
|||
{% if g.user %}
|
||||
<nav class="account-nav" aria-label="Account navigation">
|
||||
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
|
||||
<span class="muted">{{ g.user.username }}</span>
|
||||
<a class="muted" href="/profile">{{ g.user.username }}</a>
|
||||
<form method="post" action="/logout" class="inline-form">
|
||||
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||
<button class="link-button" type="submit">logout</button>
|
||||
|
|
|
|||
33
l4d2web/templates/profile.html
Normal file
33
l4d2web/templates/profile.html
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Profile | left4me{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<section class="panel">
|
||||
<h1>Change password</h1>
|
||||
|
||||
{% if success %}
|
||||
<p class="muted">Password changed.</p>
|
||||
{% endif %}
|
||||
{% if error_message %}
|
||||
<p class="error">{{ error_message }}</p>
|
||||
{% endif %}
|
||||
|
||||
<form method="post" action="/profile/password" class="form">
|
||||
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||
<label>
|
||||
Current password
|
||||
<input type="password" name="current_password" autocomplete="current-password" required>
|
||||
</label>
|
||||
<label>
|
||||
New password
|
||||
<input type="password" name="new_password" autocomplete="new-password" required>
|
||||
</label>
|
||||
<label>
|
||||
Confirm new password
|
||||
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
|
||||
</label>
|
||||
<button type="submit">Change password</button>
|
||||
</form>
|
||||
</section>
|
||||
{% endblock %}
|
||||
|
|
@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch):
|
|||
db.add_all([admin, second_admin])
|
||||
db.flush()
|
||||
admin_id = admin.id
|
||||
admin_marker = admin.password_changed_at.isoformat()
|
||||
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = admin_marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client, admin_id
|
||||
|
||||
|
|
@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
|
|||
"""An active session at the moment of deactivation stops working."""
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
bob = db.scalar(select(User).where(User.id == target))
|
||||
bob_marker = bob.password_changed_at.isoformat()
|
||||
|
||||
# Forge a session for bob.
|
||||
bob_client = client.application.test_client()
|
||||
with bob_client.session_transaction() as sess:
|
||||
sess["user_id"] = target
|
||||
sess["pw_changed_at"] = bob_marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
# Sanity: bob can hit a logged-in route.
|
||||
|
|
|
|||
|
|
@ -134,10 +134,75 @@ def test_login_sets_session(client) -> None:
|
|||
assert sess.get("user_id") is not None
|
||||
|
||||
|
||||
def test_load_current_user_rejects_missing_marker(client) -> None:
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_load_current_user_rejects_stale_marker(client) -> None:
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
|
||||
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = stale.isoformat()
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_load_current_user_accepts_current_marker(client) -> None:
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
marker = u.password_changed_at.isoformat()
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = marker
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_login_stamps_password_changed_at_in_session(client) -> None:
|
||||
with session_scope() as session:
|
||||
session.add(User(username="alice", password_digest=hash_password("secret")))
|
||||
|
||||
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
||||
assert response.status_code == 302
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
marker = sess.get("pw_changed_at")
|
||||
assert marker is not None
|
||||
with session_scope() as session:
|
||||
user = session.query(User).filter_by(username="alice").one()
|
||||
assert marker == user.password_changed_at.isoformat()
|
||||
|
||||
|
||||
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
|
|
@ -150,6 +215,19 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
|
|||
assert user.admin is True
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "at least 8" in result.output
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
|
|
@ -163,10 +241,26 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
|
|||
assert "password must not be empty" in result.output
|
||||
|
||||
|
||||
def test_validate_new_password_rejects_empty():
|
||||
from l4d2web.auth import validate_new_password
|
||||
assert validate_new_password("") == "password must not be empty"
|
||||
|
||||
|
||||
def test_validate_new_password_rejects_short():
|
||||
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
|
||||
assert MIN_PASSWORD_LENGTH == 8
|
||||
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
|
||||
|
||||
|
||||
def test_validate_new_password_accepts_min_length():
|
||||
from l4d2web.auth import validate_new_password
|
||||
assert validate_new_password("a" * 8) is None
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
with session_scope() as session:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import json
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
|
@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
return client, blueprint_id
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from pathlib import Path
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
|
|
@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||
assert response.status_code == 200
|
||||
|
|
@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||
text = response.get_data(as_text=True)
|
||||
|
|
@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
|
||||
text = response.get_data(as_text=True)
|
||||
|
|
|
|||
|
|
@ -18,3 +18,23 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
|
|||
|
||||
assert user.id is not None
|
||||
assert blueprint.id is not None
|
||||
|
||||
|
||||
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
|
||||
from datetime import UTC, datetime
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
||||
db_url = f"sqlite:///{tmp_path/'pw.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
before = datetime.now(UTC).replace(tzinfo=None)
|
||||
with session_scope() as db:
|
||||
db.add(User(username="alice", password_digest=hash_password("secret")))
|
||||
with session_scope() as db:
|
||||
user = db.query(User).filter_by(username="alice").one()
|
||||
|
||||
assert user.password_changed_at is not None
|
||||
assert user.password_changed_at >= before
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import io
|
||||
import os
|
||||
|
|
@ -34,6 +35,7 @@ def _client_for(app, user_id: int):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from l4d2web.auth import hash_password
|
|||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
|
||||
from l4d2web.services.security import validate_overlay_ref
|
||||
from datetime import UTC, datetime
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
|
|||
c = app.test_client()
|
||||
with c.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return c
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import pytest
|
||||
from pathlib import Path
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
|
@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
return client
|
||||
|
||||
|
||||
|
|
@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = owner_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
return client, blueprint_id
|
||||
|
||||
|
|
@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
admin_page = client.get("/admin")
|
||||
assert admin_page.status_code == 200
|
||||
|
|
@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}")
|
||||
|
||||
|
|
@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
|
||||
|
|
@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
logged_in_response = client.get("/")
|
||||
assert logged_in_response.status_code == 302
|
||||
|
|
@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
|
|||
admin_client = app.test_client()
|
||||
with admin_client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
with session_scope() as db:
|
||||
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
|
||||
|
|
@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
|
|||
user_client = app.test_client()
|
||||
with user_client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
with session_scope() as db:
|
||||
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
|
||||
|
|
@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = other_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/overlays/{overlay_id}/jobs")
|
||||
assert response.status_code == 403
|
||||
|
|
|
|||
258
l4d2web/tests/test_profile.py
Normal file
258
l4d2web/tests/test_profile.py
Normal file
|
|
@ -0,0 +1,258 @@
|
|||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_and_user(tmp_path, monkeypatch):
|
||||
db_url = f"sqlite:///{tmp_path/'profile.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("currentpass"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
marker = u.password_changed_at.isoformat()
|
||||
return app, uid, marker
|
||||
|
||||
|
||||
def _logged_in_client(app, uid, marker):
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
||||
def test_profile_requires_login(app_and_user):
|
||||
app, _, _ = app_and_user
|
||||
response = app.test_client().get("/profile", follow_redirects=False)
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_profile_page_renders(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
response = client.get("/profile")
|
||||
assert response.status_code == 200
|
||||
body = response.get_data(as_text=True)
|
||||
assert "Change password" in body
|
||||
assert 'name="current_password"' in body
|
||||
assert 'name="new_password"' in body
|
||||
assert 'name="confirm_new_password"' in body
|
||||
|
||||
|
||||
def test_base_template_links_username_to_profile(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
response = client.get("/dashboard")
|
||||
body = response.get_data(as_text=True)
|
||||
assert 'href="/profile"' in body
|
||||
assert ">alice<" in body
|
||||
|
||||
|
||||
def _post_pw(client, **fields):
|
||||
payload = {"csrf_token": "test-token", **fields}
|
||||
return client.post("/profile/password", data=payload, follow_redirects=False)
|
||||
|
||||
|
||||
def _digest_of(username):
|
||||
with session_scope() as db:
|
||||
u = db.scalar(select(User).where(User.username == username))
|
||||
return u.password_digest
|
||||
|
||||
|
||||
def test_post_password_rejects_missing_csrf(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = client.post(
|
||||
"/profile/password",
|
||||
data={
|
||||
"current_password": "currentpass",
|
||||
"new_password": "newpass12",
|
||||
"confirm_new_password": "newpass12",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_requires_login(app_and_user):
|
||||
app, _, _ = app_and_user
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["csrf_token"] = "test-token"
|
||||
response = client.post(
|
||||
"/profile/password",
|
||||
data={
|
||||
"csrf_token": "test-token",
|
||||
"current_password": "x",
|
||||
"new_password": "y" * 8,
|
||||
"confirm_new_password": "y" * 8,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_post_password_empty_fields_redirects_with_error(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=fields_required" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_mismatched_confirm(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass99",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=mismatch" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_too_short(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="short7x",
|
||||
confirm_new_password="short7x",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=too_short" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_wrong_current(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=wrong_current" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before_digest = _digest_of("alice")
|
||||
with session_scope() as db:
|
||||
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
|
||||
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/profile?success=1")
|
||||
|
||||
after_digest = _digest_of("alice")
|
||||
assert after_digest != before_digest
|
||||
with session_scope() as db:
|
||||
u = db.scalar(select(User).where(User.id == uid))
|
||||
assert u.password_changed_at > before_marker
|
||||
new_marker = u.password_changed_at.isoformat()
|
||||
|
||||
follow = client.get("/dashboard")
|
||||
assert follow.status_code == 200
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
assert sess["pw_changed_at"] == new_marker
|
||||
|
||||
|
||||
def test_other_session_dies_after_password_change(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
primary = _logged_in_client(app, uid, marker)
|
||||
other = _logged_in_client(app, uid, marker)
|
||||
|
||||
pre = other.get("/dashboard")
|
||||
assert pre.status_code == 200
|
||||
|
||||
response = _post_pw(
|
||||
primary,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
|
||||
post = other.get("/dashboard", follow_redirects=False)
|
||||
assert post.status_code == 302
|
||||
assert "/login" in post.headers["Location"]
|
||||
|
||||
|
||||
def test_new_password_works_for_login(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
_post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
fresh = app.test_client()
|
||||
response = fresh.post(
|
||||
"/login",
|
||||
data={"username": "alice", "password": "newpass12"},
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/dashboard")
|
||||
|
||||
|
||||
def test_post_password_rate_limited(app_and_user):
|
||||
from l4d2web.routes.profile_routes import (
|
||||
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
reset_profile_password_rate_limits,
|
||||
)
|
||||
reset_profile_password_rate_limits()
|
||||
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
|
||||
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
|
||||
_post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
blocked = _post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert blocked.status_code == 429
|
||||
31
l4d2web/tests/test_rate_limit.py
Normal file
31
l4d2web/tests/test_rate_limit.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
import time
|
||||
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
def test_under_threshold_allows():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(3):
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||
|
||||
|
||||
def test_at_threshold_blocks():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
|
||||
|
||||
|
||||
def test_other_ips_independent():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
|
||||
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
|
||||
|
||||
|
||||
def test_old_attempts_expire():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
|
||||
time.sleep(0.1)
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
"""Routes for type='script' overlays: create, /script (update body),
|
||||
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
|
@ -52,6 +53,7 @@ def _client_for(app, user_id: int):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import json
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = payload["user_id"]
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
return client, payload
|
||||
|
|
@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
response = client.get("/servers")
|
||||
|
|
@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = alice_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
alice_resp = client.post(
|
||||
"/servers",
|
||||
|
|
@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = bob_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
bob_resp = client.post(
|
||||
"/servers",
|
||||
|
|
@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
first = client.post(
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import pytest
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
|
@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
return client, server_id
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
"""Tests for the workshop overlay routes (add items, remove items, build,
|
||||
admin refresh)."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from typing import Iterable
|
||||
from unittest.mock import patch
|
||||
|
|
@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch):
|
|||
c = app.test_client()
|
||||
with c.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return c
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue