Compare commits
13 commits
e1149704c8
...
cb52a69faf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb52a69faf | ||
|
|
f643246a84 | ||
|
|
224b023ca0 | ||
|
|
47722dbb19 | ||
|
|
d25fb57f30 | ||
|
|
eef85f36a9 | ||
|
|
e75f379dcb | ||
|
|
84dc672180 | ||
|
|
26a6a9d7b0 | ||
|
|
a5982941df | ||
|
|
2353378b23 | ||
|
|
eb1f2b82eb | ||
|
|
6eb9bd0ab3 |
25 changed files with 1943 additions and 22 deletions
1260
docs/superpowers/plans/2026-05-11-profile-password-change-v1.md
Normal file
1260
docs/superpowers/plans/2026-05-11-profile-password-change-v1.md
Normal file
File diff suppressed because it is too large
Load diff
32
l4d2web/alembic/versions/0009_user_password_changed_at.py
Normal file
32
l4d2web/alembic/versions/0009_user_password_changed_at.py
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
"""users.password_changed_at
|
||||||
|
|
||||||
|
Revision ID: 0009_user_password_changed_at
|
||||||
|
Revises: 0008_user_active
|
||||||
|
Create Date: 2026-05-11
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
|
||||||
|
revision: str = "0009_user_password_changed_at"
|
||||||
|
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
with op.batch_alter_table("users") as batch_op:
|
||||||
|
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
|
||||||
|
op.execute(
|
||||||
|
"UPDATE users SET password_changed_at = created_at "
|
||||||
|
"WHERE password_changed_at IS NULL"
|
||||||
|
)
|
||||||
|
with op.batch_alter_table("users") as batch_op:
|
||||||
|
batch_op.alter_column("password_changed_at", nullable=False)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
with op.batch_alter_table("users") as batch_op:
|
||||||
|
batch_op.drop_column("password_changed_at")
|
||||||
|
|
@ -16,6 +16,8 @@ from l4d2web.routes.job_routes import bp as job_bp
|
||||||
from l4d2web.routes.log_routes import bp as log_bp
|
from l4d2web.routes.log_routes import bp as log_bp
|
||||||
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||||
from l4d2web.routes.page_routes import bp as page_bp
|
from l4d2web.routes.page_routes import bp as page_bp
|
||||||
|
from l4d2web.routes.profile_routes import bp as profile_bp
|
||||||
|
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
|
||||||
from l4d2web.routes.server_routes import bp as server_bp
|
from l4d2web.routes.server_routes import bp as server_bp
|
||||||
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
||||||
from l4d2web.services.job_worker import (
|
from l4d2web.services.job_worker import (
|
||||||
|
|
@ -82,9 +84,11 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
app.register_blueprint(job_bp)
|
app.register_blueprint(job_bp)
|
||||||
app.register_blueprint(log_bp)
|
app.register_blueprint(log_bp)
|
||||||
app.register_blueprint(page_bp)
|
app.register_blueprint(page_bp)
|
||||||
|
app.register_blueprint(profile_bp)
|
||||||
register_cli(app)
|
register_cli(app)
|
||||||
if app.config.get("TESTING"):
|
if app.config.get("TESTING"):
|
||||||
reset_login_rate_limits()
|
reset_login_rate_limits()
|
||||||
|
reset_profile_password_rate_limits()
|
||||||
should_start_workers = (
|
should_start_workers = (
|
||||||
app.config.get("JOB_WORKER_ENABLED")
|
app.config.get("JOB_WORKER_ENABLED")
|
||||||
and not app.config.get("TESTING")
|
and not app.config.get("TESTING")
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
from datetime import datetime
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from typing import Callable, TypeVar
|
from typing import Callable, TypeVar
|
||||||
from urllib.parse import quote, unquote
|
from urllib.parse import quote, unquote
|
||||||
|
|
@ -21,6 +22,17 @@ def verify_password(raw: str, digest: str) -> bool:
|
||||||
return check_password_hash(digest, raw)
|
return check_password_hash(digest, raw)
|
||||||
|
|
||||||
|
|
||||||
|
MIN_PASSWORD_LENGTH = 8
|
||||||
|
|
||||||
|
|
||||||
|
def validate_new_password(raw: str) -> str | None:
|
||||||
|
if raw == "":
|
||||||
|
return "password must not be empty"
|
||||||
|
if len(raw) < MIN_PASSWORD_LENGTH:
|
||||||
|
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def load_current_user() -> None:
|
def load_current_user() -> None:
|
||||||
user_id = session.get("user_id")
|
user_id = session.get("user_id")
|
||||||
if user_id is None:
|
if user_id is None:
|
||||||
|
|
@ -28,17 +40,38 @@ def load_current_user() -> None:
|
||||||
return
|
return
|
||||||
with session_scope() as db:
|
with session_scope() as db:
|
||||||
user = db.scalar(select(User).where(User.id == int(user_id)))
|
user = db.scalar(select(User).where(User.id == int(user_id)))
|
||||||
# Treat deactivated users as logged-out so existing sessions stop
|
if user is None or not user.active:
|
||||||
# working as soon as an admin flips active=False.
|
g.user = None
|
||||||
g.user = user if user is not None and user.active else None
|
return
|
||||||
|
|
||||||
|
marker = session.get("pw_changed_at")
|
||||||
|
if marker is None:
|
||||||
|
g.user = None
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
marker_dt = datetime.fromisoformat(marker)
|
||||||
|
except ValueError:
|
||||||
|
g.user = None
|
||||||
|
return
|
||||||
|
# user.password_changed_at comes back naive from SQLite; strip tz from the
|
||||||
|
# marker so an aware-marker session (just stamped from an in-memory user)
|
||||||
|
# compares cleanly with a freshly-loaded user row.
|
||||||
|
if marker_dt.tzinfo is not None:
|
||||||
|
marker_dt = marker_dt.replace(tzinfo=None)
|
||||||
|
if marker_dt < user.password_changed_at:
|
||||||
|
g.user = None
|
||||||
|
return
|
||||||
|
|
||||||
|
g.user = user
|
||||||
|
|
||||||
|
|
||||||
def current_user() -> User | None:
|
def current_user() -> User | None:
|
||||||
return getattr(g, "user", None)
|
return getattr(g, "user", None)
|
||||||
|
|
||||||
|
|
||||||
def login_user(user_id: int) -> None:
|
def login_user(user_id: int, password_changed_at) -> None:
|
||||||
session["user_id"] = user_id
|
session["user_id"] = user_id
|
||||||
|
session["pw_changed_at"] = password_changed_at.isoformat()
|
||||||
|
|
||||||
|
|
||||||
def logout_user() -> None:
|
def logout_user() -> None:
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import click
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from l4d2web.auth import hash_password
|
from l4d2web.auth import hash_password, validate_new_password
|
||||||
from l4d2web.db import session_scope
|
from l4d2web.db import session_scope
|
||||||
from l4d2web.models import Overlay, User
|
from l4d2web.models import Overlay, User
|
||||||
from l4d2web.services.overlay_creation import (
|
from l4d2web.services.overlay_creation import (
|
||||||
|
|
@ -31,8 +31,9 @@ def create_user(username: str, admin: bool) -> None:
|
||||||
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
|
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
|
||||||
if password is None:
|
if password is None:
|
||||||
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
|
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
|
||||||
if password == "":
|
policy_error = validate_new_password(password)
|
||||||
raise click.ClickException("password must not be empty")
|
if policy_error is not None:
|
||||||
|
raise click.ClickException(policy_error)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with session_scope() as db:
|
with session_scope() as db:
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ class User(Base):
|
||||||
)
|
)
|
||||||
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||||
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||||
|
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||||
|
|
||||||
|
|
||||||
class Overlay(Base):
|
class Overlay(Base):
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,15 @@
|
||||||
import time
|
|
||||||
|
|
||||||
from flask import Blueprint, Response, redirect, render_template, request
|
from flask import Blueprint, Response, redirect, render_template, request
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
||||||
from l4d2web.db import session_scope
|
from l4d2web.db import session_scope
|
||||||
from l4d2web.models import User
|
from l4d2web.models import User
|
||||||
|
from l4d2web.services.rate_limit import check_rate_limit
|
||||||
|
|
||||||
|
|
||||||
bp = Blueprint("auth", __name__)
|
bp = Blueprint("auth", __name__)
|
||||||
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
||||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
|
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||||
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
||||||
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||||
|
|
||||||
|
|
@ -20,14 +19,12 @@ def reset_login_rate_limits() -> None:
|
||||||
|
|
||||||
|
|
||||||
def is_login_rate_limited(remote_addr: str) -> bool:
|
def is_login_rate_limited(remote_addr: str) -> bool:
|
||||||
now = time.time()
|
return check_rate_limit(
|
||||||
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
|
LOGIN_ATTEMPTS_BY_IP,
|
||||||
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
|
remote_addr,
|
||||||
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
||||||
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
|
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
||||||
return True
|
)
|
||||||
attempts.append(now)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@bp.get("/login")
|
@bp.get("/login")
|
||||||
|
|
@ -52,7 +49,7 @@ def login() -> Response:
|
||||||
# Same generic response for missing user, wrong password, or
|
# Same generic response for missing user, wrong password, or
|
||||||
# deactivated account — no timing oracle for deactivation status.
|
# deactivated account — no timing oracle for deactivation status.
|
||||||
return Response("invalid credentials", status=401)
|
return Response("invalid credentials", status=401)
|
||||||
login_user(user.id)
|
login_user(user.id, user.password_changed_at)
|
||||||
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
||||||
next_target = request.form.get("next", "")
|
next_target = request.form.get("next", "")
|
||||||
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
||||||
|
|
|
||||||
93
l4d2web/routes/profile_routes.py
Normal file
93
l4d2web/routes/profile_routes.py
Normal file
|
|
@ -0,0 +1,93 @@
|
||||||
|
from flask import Blueprint, Response, redirect, render_template, request, session
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from l4d2web.auth import (
|
||||||
|
current_user,
|
||||||
|
hash_password,
|
||||||
|
require_login,
|
||||||
|
validate_new_password,
|
||||||
|
verify_password,
|
||||||
|
)
|
||||||
|
from l4d2web.db import session_scope
|
||||||
|
from l4d2web.models import User, now_utc
|
||||||
|
from l4d2web.services.rate_limit import check_rate_limit
|
||||||
|
|
||||||
|
|
||||||
|
bp = Blueprint("profile", __name__)
|
||||||
|
|
||||||
|
|
||||||
|
_ERROR_MESSAGES = {
|
||||||
|
"fields_required": "Fill in all three fields.",
|
||||||
|
"mismatch": "New password and confirmation do not match.",
|
||||||
|
"too_short": "New password must be at least 8 characters.",
|
||||||
|
"empty": "New password must not be empty.",
|
||||||
|
"wrong_current": "Current password is incorrect.",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||||
|
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
|
||||||
|
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def reset_profile_password_rate_limits() -> None:
|
||||||
|
PROFILE_PW_ATTEMPTS_BY_IP.clear()
|
||||||
|
|
||||||
|
|
||||||
|
def _redirect_with_error(error_key: str) -> Response:
|
||||||
|
return redirect(f"/profile?error={error_key}")
|
||||||
|
|
||||||
|
|
||||||
|
@bp.get("/profile")
|
||||||
|
@require_login
|
||||||
|
def profile_page() -> str:
|
||||||
|
error_key = request.args.get("error", "")
|
||||||
|
success = request.args.get("success") == "1"
|
||||||
|
return render_template(
|
||||||
|
"profile.html",
|
||||||
|
error_message=_ERROR_MESSAGES.get(error_key, ""),
|
||||||
|
success=success,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@bp.post("/profile/password")
|
||||||
|
@require_login
|
||||||
|
def profile_password_change() -> Response:
|
||||||
|
remote_addr = request.remote_addr or "unknown"
|
||||||
|
if check_rate_limit(
|
||||||
|
PROFILE_PW_ATTEMPTS_BY_IP,
|
||||||
|
remote_addr,
|
||||||
|
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
|
||||||
|
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||||
|
):
|
||||||
|
return Response("too many attempts", status=429)
|
||||||
|
|
||||||
|
current_password = request.form.get("current_password", "")
|
||||||
|
new_password = request.form.get("new_password", "")
|
||||||
|
confirm_new_password = request.form.get("confirm_new_password", "")
|
||||||
|
|
||||||
|
if not current_password or not new_password or not confirm_new_password:
|
||||||
|
return _redirect_with_error("fields_required")
|
||||||
|
if new_password != confirm_new_password:
|
||||||
|
return _redirect_with_error("mismatch")
|
||||||
|
policy_error = validate_new_password(new_password)
|
||||||
|
if policy_error is not None:
|
||||||
|
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
|
||||||
|
return _redirect_with_error(error_key)
|
||||||
|
|
||||||
|
actor = current_user()
|
||||||
|
assert actor is not None # @require_login
|
||||||
|
|
||||||
|
with session_scope() as db:
|
||||||
|
user = db.scalar(select(User).where(User.id == actor.id))
|
||||||
|
if user is None or not verify_password(current_password, user.password_digest):
|
||||||
|
return _redirect_with_error("wrong_current")
|
||||||
|
user.password_digest = hash_password(new_password)
|
||||||
|
# Strip tz so the marker matches what a subsequent DB read returns
|
||||||
|
# (SQLite DateTime columns don't preserve tzinfo).
|
||||||
|
user.password_changed_at = now_utc().replace(tzinfo=None)
|
||||||
|
new_marker = user.password_changed_at.isoformat()
|
||||||
|
|
||||||
|
session["pw_changed_at"] = new_marker
|
||||||
|
|
||||||
|
return redirect("/profile?success=1")
|
||||||
23
l4d2web/services/rate_limit.py
Normal file
23
l4d2web/services/rate_limit.py
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def check_rate_limit(
|
||||||
|
bucket: dict[str, list[float]],
|
||||||
|
remote_addr: str,
|
||||||
|
*,
|
||||||
|
window: float,
|
||||||
|
max_attempts: int,
|
||||||
|
) -> bool:
|
||||||
|
"""Return True if this request should be rate-limited (rejected).
|
||||||
|
|
||||||
|
The bucket is owned by the caller so each endpoint can keep an
|
||||||
|
independent count and tests can reset state cleanly.
|
||||||
|
"""
|
||||||
|
now = time.time()
|
||||||
|
attempts = bucket.setdefault(remote_addr, [])
|
||||||
|
cutoff = now - window
|
||||||
|
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
||||||
|
if len(attempts) >= max_attempts:
|
||||||
|
return True
|
||||||
|
attempts.append(now)
|
||||||
|
return False
|
||||||
|
|
@ -24,7 +24,7 @@
|
||||||
{% if g.user %}
|
{% if g.user %}
|
||||||
<nav class="account-nav" aria-label="Account navigation">
|
<nav class="account-nav" aria-label="Account navigation">
|
||||||
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
|
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
|
||||||
<span class="muted">{{ g.user.username }}</span>
|
<a class="muted" href="/profile">{{ g.user.username }}</a>
|
||||||
<form method="post" action="/logout" class="inline-form">
|
<form method="post" action="/logout" class="inline-form">
|
||||||
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||||
<button class="link-button" type="submit">logout</button>
|
<button class="link-button" type="submit">logout</button>
|
||||||
|
|
|
||||||
33
l4d2web/templates/profile.html
Normal file
33
l4d2web/templates/profile.html
Normal file
|
|
@ -0,0 +1,33 @@
|
||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Profile | left4me{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<section class="panel">
|
||||||
|
<h1>Change password</h1>
|
||||||
|
|
||||||
|
{% if success %}
|
||||||
|
<p class="muted">Password changed.</p>
|
||||||
|
{% endif %}
|
||||||
|
{% if error_message %}
|
||||||
|
<p class="error">{{ error_message }}</p>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<form method="post" action="/profile/password" class="form">
|
||||||
|
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||||
|
<label>
|
||||||
|
Current password
|
||||||
|
<input type="password" name="current_password" autocomplete="current-password" required>
|
||||||
|
</label>
|
||||||
|
<label>
|
||||||
|
New password
|
||||||
|
<input type="password" name="new_password" autocomplete="new-password" required>
|
||||||
|
</label>
|
||||||
|
<label>
|
||||||
|
Confirm new password
|
||||||
|
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
|
||||||
|
</label>
|
||||||
|
<button type="submit">Change password</button>
|
||||||
|
</form>
|
||||||
|
</section>
|
||||||
|
{% endblock %}
|
||||||
|
|
@ -25,10 +25,12 @@ def admin_client(tmp_path, monkeypatch):
|
||||||
db.add_all([admin, second_admin])
|
db.add_all([admin, second_admin])
|
||||||
db.flush()
|
db.flush()
|
||||||
admin_id = admin.id
|
admin_id = admin.id
|
||||||
|
admin_marker = admin.password_changed_at.isoformat()
|
||||||
|
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = admin_marker
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client, admin_id
|
return client, admin_id
|
||||||
|
|
||||||
|
|
@ -124,11 +126,15 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
|
||||||
"""An active session at the moment of deactivation stops working."""
|
"""An active session at the moment of deactivation stops working."""
|
||||||
client, _ = admin_client
|
client, _ = admin_client
|
||||||
target = _add_user("bob")
|
target = _add_user("bob")
|
||||||
|
with session_scope() as db:
|
||||||
|
bob = db.scalar(select(User).where(User.id == target))
|
||||||
|
bob_marker = bob.password_changed_at.isoformat()
|
||||||
|
|
||||||
# Forge a session for bob.
|
# Forge a session for bob.
|
||||||
bob_client = client.application.test_client()
|
bob_client = client.application.test_client()
|
||||||
with bob_client.session_transaction() as sess:
|
with bob_client.session_transaction() as sess:
|
||||||
sess["user_id"] = target
|
sess["user_id"] = target
|
||||||
|
sess["pw_changed_at"] = bob_marker
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
# Sanity: bob can hit a logged-in route.
|
# Sanity: bob can hit a logged-in route.
|
||||||
|
|
|
||||||
|
|
@ -134,10 +134,75 @@ def test_login_sets_session(client) -> None:
|
||||||
assert sess.get("user_id") is not None
|
assert sess.get("user_id") is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_current_user_rejects_missing_marker(client) -> None:
|
||||||
|
with session_scope() as db:
|
||||||
|
u = User(username="alice", password_digest=hash_password("secret"))
|
||||||
|
db.add(u)
|
||||||
|
db.flush()
|
||||||
|
uid = u.id
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess["user_id"] = uid
|
||||||
|
|
||||||
|
response = client.get("/dashboard")
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/login" in response.headers["Location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_current_user_rejects_stale_marker(client) -> None:
|
||||||
|
from datetime import UTC, datetime, timedelta
|
||||||
|
|
||||||
|
with session_scope() as db:
|
||||||
|
u = User(username="alice", password_digest=hash_password("secret"))
|
||||||
|
db.add(u)
|
||||||
|
db.flush()
|
||||||
|
uid = u.id
|
||||||
|
|
||||||
|
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess["user_id"] = uid
|
||||||
|
sess["pw_changed_at"] = stale.isoformat()
|
||||||
|
|
||||||
|
response = client.get("/dashboard")
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/login" in response.headers["Location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_current_user_accepts_current_marker(client) -> None:
|
||||||
|
with session_scope() as db:
|
||||||
|
u = User(username="alice", password_digest=hash_password("secret"))
|
||||||
|
db.add(u)
|
||||||
|
db.flush()
|
||||||
|
uid = u.id
|
||||||
|
marker = u.password_changed_at.isoformat()
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess["user_id"] = uid
|
||||||
|
sess["pw_changed_at"] = marker
|
||||||
|
|
||||||
|
response = client.get("/dashboard")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_stamps_password_changed_at_in_session(client) -> None:
|
||||||
|
with session_scope() as session:
|
||||||
|
session.add(User(username="alice", password_digest=hash_password("secret")))
|
||||||
|
|
||||||
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
||||||
|
assert response.status_code == 302
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
marker = sess.get("pw_changed_at")
|
||||||
|
assert marker is not None
|
||||||
|
with session_scope() as session:
|
||||||
|
user = session.query(User).filter_by(username="alice").one()
|
||||||
|
assert marker == user.password_changed_at.isoformat()
|
||||||
|
|
||||||
|
|
||||||
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
||||||
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
||||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
init_db()
|
init_db()
|
||||||
|
|
||||||
|
|
@ -150,6 +215,19 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
|
||||||
assert user.admin is True
|
assert user.admin is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
|
||||||
|
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
|
||||||
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
|
||||||
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
||||||
|
|
||||||
|
assert result.exit_code != 0
|
||||||
|
assert "at least 8" in result.output
|
||||||
|
|
||||||
|
|
||||||
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
|
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
|
||||||
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
|
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
|
||||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
|
@ -163,10 +241,26 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
|
||||||
assert "password must not be empty" in result.output
|
assert "password must not be empty" in result.output
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_new_password_rejects_empty():
|
||||||
|
from l4d2web.auth import validate_new_password
|
||||||
|
assert validate_new_password("") == "password must not be empty"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_new_password_rejects_short():
|
||||||
|
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
|
||||||
|
assert MIN_PASSWORD_LENGTH == 8
|
||||||
|
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_new_password_accepts_min_length():
|
||||||
|
from l4d2web.auth import validate_new_password
|
||||||
|
assert validate_new_password("a" * 8) is None
|
||||||
|
|
||||||
|
|
||||||
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
|
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
|
||||||
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
|
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
|
||||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
init_db()
|
init_db()
|
||||||
with session_scope() as session:
|
with session_scope() as session:
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import json
|
import json
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
@ -31,6 +32,7 @@ def user_client(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
@ -58,6 +60,7 @@ def linked_blueprint(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
return client, blueprint_id
|
return client, blueprint_id
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
|
@ -66,6 +67,7 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
@ -76,6 +78,7 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||||
text = response.get_data(as_text=True)
|
text = response.get_data(as_text=True)
|
||||||
|
|
@ -91,6 +94,7 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
|
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
|
||||||
text = response.get_data(as_text=True)
|
text = response.get_data(as_text=True)
|
||||||
|
|
|
||||||
|
|
@ -18,3 +18,23 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
|
||||||
|
|
||||||
assert user.id is not None
|
assert user.id is not None
|
||||||
assert blueprint.id is not None
|
assert blueprint.id is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
from l4d2web.app import create_app
|
||||||
|
from l4d2web.auth import hash_password
|
||||||
|
|
||||||
|
db_url = f"sqlite:///{tmp_path/'pw.db'}"
|
||||||
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
before = datetime.now(UTC).replace(tzinfo=None)
|
||||||
|
with session_scope() as db:
|
||||||
|
db.add(User(username="alice", password_digest=hash_password("secret")))
|
||||||
|
with session_scope() as db:
|
||||||
|
user = db.query(User).filter_by(username="alice").one()
|
||||||
|
|
||||||
|
assert user.password_changed_at is not None
|
||||||
|
assert user.password_changed_at >= before
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
|
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
|
|
@ -34,6 +35,7 @@ def _client_for(app, user_id: int):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ from l4d2web.auth import hash_password
|
||||||
from l4d2web.db import init_db, session_scope
|
from l4d2web.db import init_db, session_scope
|
||||||
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
|
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
|
||||||
from l4d2web.services.security import validate_overlay_ref
|
from l4d2web.services.security import validate_overlay_ref
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|
@ -23,6 +24,7 @@ def admin_client(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
@ -48,6 +50,7 @@ def user_client_with_overlay(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
@ -146,6 +149,7 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
|
||||||
c = app.test_client()
|
c = app.test_client()
|
||||||
with c.session_transaction() as sess:
|
with c.session_transaction() as sess:
|
||||||
sess["user_id"] = uid
|
sess["user_id"] = uid
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return c
|
return c
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import pytest
|
import pytest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from l4d2web.app import create_app
|
from l4d2web.app import create_app
|
||||||
from l4d2web.auth import hash_password
|
from l4d2web.auth import hash_password
|
||||||
|
|
@ -35,6 +36,7 @@ def auth_client_with_server(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -61,6 +63,7 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = owner_id
|
sess["user_id"] = owner_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
return client, blueprint_id
|
return client, blueprint_id
|
||||||
|
|
||||||
|
|
@ -345,6 +348,7 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
admin_page = client.get("/admin")
|
admin_page = client.get("/admin")
|
||||||
assert admin_page.status_code == 200
|
assert admin_page.status_code == 200
|
||||||
|
|
@ -380,6 +384,7 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
response = client.get(f"/jobs/{job_id}")
|
response = client.get(f"/jobs/{job_id}")
|
||||||
|
|
||||||
|
|
@ -402,6 +407,7 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
|
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
|
||||||
|
|
@ -457,6 +463,7 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
logged_in_response = client.get("/")
|
logged_in_response = client.get("/")
|
||||||
assert logged_in_response.status_code == 302
|
assert logged_in_response.status_code == 302
|
||||||
|
|
@ -514,6 +521,7 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
|
||||||
admin_client = app.test_client()
|
admin_client = app.test_client()
|
||||||
with admin_client.session_transaction() as sess:
|
with admin_client.session_transaction() as sess:
|
||||||
sess["user_id"] = admin_id
|
sess["user_id"] = admin_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
with session_scope() as db:
|
with session_scope() as db:
|
||||||
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
|
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
|
||||||
|
|
@ -541,6 +549,7 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
|
||||||
user_client = app.test_client()
|
user_client = app.test_client()
|
||||||
with user_client.session_transaction() as sess:
|
with user_client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
with session_scope() as db:
|
with session_scope() as db:
|
||||||
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
|
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
|
||||||
|
|
@ -717,6 +726,7 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = other_id
|
sess["user_id"] = other_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
response = client.get(f"/overlays/{overlay_id}/jobs")
|
response = client.get(f"/overlays/{overlay_id}/jobs")
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
|
||||||
258
l4d2web/tests/test_profile.py
Normal file
258
l4d2web/tests/test_profile.py
Normal file
|
|
@ -0,0 +1,258 @@
|
||||||
|
import pytest
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from l4d2web.app import create_app
|
||||||
|
from l4d2web.auth import hash_password
|
||||||
|
from l4d2web.db import init_db, session_scope
|
||||||
|
from l4d2web.models import User
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def app_and_user(tmp_path, monkeypatch):
|
||||||
|
db_url = f"sqlite:///{tmp_path/'profile.db'}"
|
||||||
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
init_db()
|
||||||
|
with session_scope() as db:
|
||||||
|
u = User(username="alice", password_digest=hash_password("currentpass"))
|
||||||
|
db.add(u)
|
||||||
|
db.flush()
|
||||||
|
uid = u.id
|
||||||
|
marker = u.password_changed_at.isoformat()
|
||||||
|
return app, uid, marker
|
||||||
|
|
||||||
|
|
||||||
|
def _logged_in_client(app, uid, marker):
|
||||||
|
client = app.test_client()
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess["user_id"] = uid
|
||||||
|
sess["pw_changed_at"] = marker
|
||||||
|
sess["csrf_token"] = "test-token"
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_requires_login(app_and_user):
|
||||||
|
app, _, _ = app_and_user
|
||||||
|
response = app.test_client().get("/profile", follow_redirects=False)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/login" in response.headers["Location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_profile_page_renders(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
response = client.get("/profile")
|
||||||
|
assert response.status_code == 200
|
||||||
|
body = response.get_data(as_text=True)
|
||||||
|
assert "Change password" in body
|
||||||
|
assert 'name="current_password"' in body
|
||||||
|
assert 'name="new_password"' in body
|
||||||
|
assert 'name="confirm_new_password"' in body
|
||||||
|
|
||||||
|
|
||||||
|
def test_base_template_links_username_to_profile(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
response = client.get("/dashboard")
|
||||||
|
body = response.get_data(as_text=True)
|
||||||
|
assert 'href="/profile"' in body
|
||||||
|
assert ">alice<" in body
|
||||||
|
|
||||||
|
|
||||||
|
def _post_pw(client, **fields):
|
||||||
|
payload = {"csrf_token": "test-token", **fields}
|
||||||
|
return client.post("/profile/password", data=payload, follow_redirects=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _digest_of(username):
|
||||||
|
with session_scope() as db:
|
||||||
|
u = db.scalar(select(User).where(User.username == username))
|
||||||
|
return u.password_digest
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_rejects_missing_csrf(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before = _digest_of("alice")
|
||||||
|
response = client.post(
|
||||||
|
"/profile/password",
|
||||||
|
data={
|
||||||
|
"current_password": "currentpass",
|
||||||
|
"new_password": "newpass12",
|
||||||
|
"confirm_new_password": "newpass12",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert _digest_of("alice") == before
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_requires_login(app_and_user):
|
||||||
|
app, _, _ = app_and_user
|
||||||
|
client = app.test_client()
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
sess["csrf_token"] = "test-token"
|
||||||
|
response = client.post(
|
||||||
|
"/profile/password",
|
||||||
|
data={
|
||||||
|
"csrf_token": "test-token",
|
||||||
|
"current_password": "x",
|
||||||
|
"new_password": "y" * 8,
|
||||||
|
"confirm_new_password": "y" * 8,
|
||||||
|
},
|
||||||
|
follow_redirects=False,
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/login" in response.headers["Location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_empty_fields_redirects_with_error(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before = _digest_of("alice")
|
||||||
|
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/profile?error=fields_required" in response.headers["Location"]
|
||||||
|
assert _digest_of("alice") == before
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_mismatched_confirm(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before = _digest_of("alice")
|
||||||
|
response = _post_pw(
|
||||||
|
client,
|
||||||
|
current_password="currentpass",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass99",
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/profile?error=mismatch" in response.headers["Location"]
|
||||||
|
assert _digest_of("alice") == before
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_too_short(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before = _digest_of("alice")
|
||||||
|
response = _post_pw(
|
||||||
|
client,
|
||||||
|
current_password="currentpass",
|
||||||
|
new_password="short7x",
|
||||||
|
confirm_new_password="short7x",
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/profile?error=too_short" in response.headers["Location"]
|
||||||
|
assert _digest_of("alice") == before
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_wrong_current(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before = _digest_of("alice")
|
||||||
|
response = _post_pw(
|
||||||
|
client,
|
||||||
|
current_password="WRONG",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert "/profile?error=wrong_current" in response.headers["Location"]
|
||||||
|
assert _digest_of("alice") == before
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
before_digest = _digest_of("alice")
|
||||||
|
with session_scope() as db:
|
||||||
|
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
|
||||||
|
|
||||||
|
response = _post_pw(
|
||||||
|
client,
|
||||||
|
current_password="currentpass",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert response.headers["Location"].endswith("/profile?success=1")
|
||||||
|
|
||||||
|
after_digest = _digest_of("alice")
|
||||||
|
assert after_digest != before_digest
|
||||||
|
with session_scope() as db:
|
||||||
|
u = db.scalar(select(User).where(User.id == uid))
|
||||||
|
assert u.password_changed_at > before_marker
|
||||||
|
new_marker = u.password_changed_at.isoformat()
|
||||||
|
|
||||||
|
follow = client.get("/dashboard")
|
||||||
|
assert follow.status_code == 200
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
assert sess["pw_changed_at"] == new_marker
|
||||||
|
|
||||||
|
|
||||||
|
def test_other_session_dies_after_password_change(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
primary = _logged_in_client(app, uid, marker)
|
||||||
|
other = _logged_in_client(app, uid, marker)
|
||||||
|
|
||||||
|
pre = other.get("/dashboard")
|
||||||
|
assert pre.status_code == 200
|
||||||
|
|
||||||
|
response = _post_pw(
|
||||||
|
primary,
|
||||||
|
current_password="currentpass",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
|
||||||
|
post = other.get("/dashboard", follow_redirects=False)
|
||||||
|
assert post.status_code == 302
|
||||||
|
assert "/login" in post.headers["Location"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_new_password_works_for_login(app_and_user):
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
_post_pw(
|
||||||
|
client,
|
||||||
|
current_password="currentpass",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
|
||||||
|
fresh = app.test_client()
|
||||||
|
response = fresh.post(
|
||||||
|
"/login",
|
||||||
|
data={"username": "alice", "password": "newpass12"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert response.headers["Location"].endswith("/dashboard")
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_password_rate_limited(app_and_user):
|
||||||
|
from l4d2web.routes.profile_routes import (
|
||||||
|
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||||
|
reset_profile_password_rate_limits,
|
||||||
|
)
|
||||||
|
reset_profile_password_rate_limits()
|
||||||
|
|
||||||
|
app, uid, marker = app_and_user
|
||||||
|
client = _logged_in_client(app, uid, marker)
|
||||||
|
|
||||||
|
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
|
||||||
|
_post_pw(
|
||||||
|
client,
|
||||||
|
current_password="WRONG",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
|
||||||
|
blocked = _post_pw(
|
||||||
|
client,
|
||||||
|
current_password="WRONG",
|
||||||
|
new_password="newpass12",
|
||||||
|
confirm_new_password="newpass12",
|
||||||
|
)
|
||||||
|
assert blocked.status_code == 429
|
||||||
31
l4d2web/tests/test_rate_limit.py
Normal file
31
l4d2web/tests/test_rate_limit.py
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
from l4d2web.services.rate_limit import check_rate_limit
|
||||||
|
|
||||||
|
|
||||||
|
def test_under_threshold_allows():
|
||||||
|
bucket: dict[str, list[float]] = {}
|
||||||
|
for _ in range(3):
|
||||||
|
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_at_threshold_blocks():
|
||||||
|
bucket: dict[str, list[float]] = {}
|
||||||
|
for _ in range(5):
|
||||||
|
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||||
|
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_other_ips_independent():
|
||||||
|
bucket: dict[str, list[float]] = {}
|
||||||
|
for _ in range(5):
|
||||||
|
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
|
||||||
|
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_old_attempts_expire():
|
||||||
|
bucket: dict[str, list[float]] = {}
|
||||||
|
for _ in range(5):
|
||||||
|
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
|
||||||
|
time.sleep(0.1)
|
||||||
|
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
"""Routes for type='script' overlays: create, /script (update body),
|
"""Routes for type='script' overlays: create, /script (update body),
|
||||||
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
|
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
@ -52,6 +53,7 @@ def _client_for(app, user_id: int):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import json
|
import json
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
@ -32,6 +33,7 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = payload["user_id"]
|
sess["user_id"] = payload["user_id"]
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
return client, payload
|
return client, payload
|
||||||
|
|
@ -52,6 +54,7 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
response = client.get("/servers")
|
response = client.get("/servers")
|
||||||
|
|
@ -235,6 +238,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = alice_id
|
sess["user_id"] = alice_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
alice_resp = client.post(
|
alice_resp = client.post(
|
||||||
"/servers",
|
"/servers",
|
||||||
|
|
@ -245,6 +249,7 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
||||||
|
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = bob_id
|
sess["user_id"] = bob_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
bob_resp = client.post(
|
bob_resp = client.post(
|
||||||
"/servers",
|
"/servers",
|
||||||
|
|
@ -318,6 +323,7 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
|
|
||||||
first = client.post(
|
first = client.post(
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from l4d2web.app import create_app
|
from l4d2web.app import create_app
|
||||||
from l4d2web.auth import hash_password
|
from l4d2web.auth import hash_password
|
||||||
|
|
@ -32,6 +33,7 @@ def owner_client_with_server(tmp_path, monkeypatch):
|
||||||
client = app.test_client()
|
client = app.test_client()
|
||||||
with client.session_transaction() as sess:
|
with client.session_transaction() as sess:
|
||||||
sess["user_id"] = user_id
|
sess["user_id"] = user_id
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
|
|
||||||
return client, server_id
|
return client, server_id
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
"""Tests for the workshop overlay routes (add items, remove items, build,
|
"""Tests for the workshop overlay routes (add items, remove items, build,
|
||||||
admin refresh)."""
|
admin refresh)."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
@ -54,6 +55,7 @@ def env_user(tmp_path, monkeypatch):
|
||||||
c = app.test_client()
|
c = app.test_client()
|
||||||
with c.session_transaction() as sess:
|
with c.session_transaction() as sess:
|
||||||
sess["user_id"] = uid
|
sess["user_id"] = uid
|
||||||
|
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||||
sess["csrf_token"] = "test-token"
|
sess["csrf_token"] = "test-token"
|
||||||
return c
|
return c
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue