Compare commits
No commits in common. "cb52a69faf23355f310eaf319e750b7b522997b4" and "e1149704c8e280baef83b04aaaa85bc410ecaeaf" have entirely different histories.
cb52a69faf
...
e1149704c8
25 changed files with 22 additions and 1943 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -1,32 +0,0 @@
|
|||
"""users.password_changed_at
|
||||
|
||||
Revision ID: 0009_user_password_changed_at
|
||||
Revises: 0008_user_active
|
||||
Create Date: 2026-05-11
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
|
||||
revision: str = "0009_user_password_changed_at"
|
||||
down_revision: Union[str, Sequence[str], None] = "0008_user_active"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.add_column(sa.Column("password_changed_at", sa.DateTime(), nullable=True))
|
||||
op.execute(
|
||||
"UPDATE users SET password_changed_at = created_at "
|
||||
"WHERE password_changed_at IS NULL"
|
||||
)
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.alter_column("password_changed_at", nullable=False)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("users") as batch_op:
|
||||
batch_op.drop_column("password_changed_at")
|
||||
|
|
@ -16,8 +16,6 @@ from l4d2web.routes.job_routes import bp as job_bp
|
|||
from l4d2web.routes.log_routes import bp as log_bp
|
||||
from l4d2web.routes.overlay_routes import bp as overlay_bp
|
||||
from l4d2web.routes.page_routes import bp as page_bp
|
||||
from l4d2web.routes.profile_routes import bp as profile_bp
|
||||
from l4d2web.routes.profile_routes import reset_profile_password_rate_limits
|
||||
from l4d2web.routes.server_routes import bp as server_bp
|
||||
from l4d2web.routes.workshop_routes import bp as workshop_bp
|
||||
from l4d2web.services.job_worker import (
|
||||
|
|
@ -84,11 +82,9 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
|||
app.register_blueprint(job_bp)
|
||||
app.register_blueprint(log_bp)
|
||||
app.register_blueprint(page_bp)
|
||||
app.register_blueprint(profile_bp)
|
||||
register_cli(app)
|
||||
if app.config.get("TESTING"):
|
||||
reset_login_rate_limits()
|
||||
reset_profile_password_rate_limits()
|
||||
should_start_workers = (
|
||||
app.config.get("JOB_WORKER_ENABLED")
|
||||
and not app.config.get("TESTING")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from typing import Callable, TypeVar
|
||||
from urllib.parse import quote, unquote
|
||||
|
|
@ -22,17 +21,6 @@ def verify_password(raw: str, digest: str) -> bool:
|
|||
return check_password_hash(digest, raw)
|
||||
|
||||
|
||||
MIN_PASSWORD_LENGTH = 8
|
||||
|
||||
|
||||
def validate_new_password(raw: str) -> str | None:
|
||||
if raw == "":
|
||||
return "password must not be empty"
|
||||
if len(raw) < MIN_PASSWORD_LENGTH:
|
||||
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
|
||||
return None
|
||||
|
||||
|
||||
def load_current_user() -> None:
|
||||
user_id = session.get("user_id")
|
||||
if user_id is None:
|
||||
|
|
@ -40,38 +28,17 @@ def load_current_user() -> None:
|
|||
return
|
||||
with session_scope() as db:
|
||||
user = db.scalar(select(User).where(User.id == int(user_id)))
|
||||
if user is None or not user.active:
|
||||
g.user = None
|
||||
return
|
||||
|
||||
marker = session.get("pw_changed_at")
|
||||
if marker is None:
|
||||
g.user = None
|
||||
return
|
||||
try:
|
||||
marker_dt = datetime.fromisoformat(marker)
|
||||
except ValueError:
|
||||
g.user = None
|
||||
return
|
||||
# user.password_changed_at comes back naive from SQLite; strip tz from the
|
||||
# marker so an aware-marker session (just stamped from an in-memory user)
|
||||
# compares cleanly with a freshly-loaded user row.
|
||||
if marker_dt.tzinfo is not None:
|
||||
marker_dt = marker_dt.replace(tzinfo=None)
|
||||
if marker_dt < user.password_changed_at:
|
||||
g.user = None
|
||||
return
|
||||
|
||||
g.user = user
|
||||
# Treat deactivated users as logged-out so existing sessions stop
|
||||
# working as soon as an admin flips active=False.
|
||||
g.user = user if user is not None and user.active else None
|
||||
|
||||
|
||||
def current_user() -> User | None:
|
||||
return getattr(g, "user", None)
|
||||
|
||||
|
||||
def login_user(user_id: int, password_changed_at) -> None:
|
||||
def login_user(user_id: int) -> None:
|
||||
session["user_id"] = user_id
|
||||
session["pw_changed_at"] = password_changed_at.isoformat()
|
||||
|
||||
|
||||
def logout_user() -> None:
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import click
|
|||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import hash_password, validate_new_password
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import Overlay, User
|
||||
from l4d2web.services.overlay_creation import (
|
||||
|
|
@ -31,9 +31,8 @@ def create_user(username: str, admin: bool) -> None:
|
|||
password = os.getenv("LEFT4ME_ADMIN_PASSWORD")
|
||||
if password is None:
|
||||
password = click.prompt("Password", hide_input=True, confirmation_prompt=True)
|
||||
policy_error = validate_new_password(password)
|
||||
if policy_error is not None:
|
||||
raise click.ClickException(policy_error)
|
||||
if password == "":
|
||||
raise click.ClickException("password must not be empty")
|
||||
|
||||
try:
|
||||
with session_scope() as db:
|
||||
|
|
|
|||
|
|
@ -35,7 +35,6 @@ class User(Base):
|
|||
)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
password_changed_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
|
||||
|
||||
|
||||
class Overlay(Base):
|
||||
|
|
|
|||
|
|
@ -1,15 +1,16 @@
|
|||
import time
|
||||
|
||||
from flask import Blueprint, Response, redirect, render_template, request
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import hash_password, is_safe_next, login_user, logout_user, verify_password
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import User
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
bp = Blueprint("auth", __name__)
|
||||
_TIMING_DUMMY_DIGEST = hash_password("__timing_dummy__")
|
||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
|
||||
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
||||
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||
|
||||
|
|
@ -19,12 +20,14 @@ def reset_login_rate_limits() -> None:
|
|||
|
||||
|
||||
def is_login_rate_limited(remote_addr: str) -> bool:
|
||||
return check_rate_limit(
|
||||
LOGIN_ATTEMPTS_BY_IP,
|
||||
remote_addr,
|
||||
window=LOGIN_RATE_LIMIT_WINDOW_SECONDS,
|
||||
max_attempts=LOGIN_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
)
|
||||
now = time.time()
|
||||
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
|
||||
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
|
||||
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
||||
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
|
||||
return True
|
||||
attempts.append(now)
|
||||
return False
|
||||
|
||||
|
||||
@bp.get("/login")
|
||||
|
|
@ -49,7 +52,7 @@ def login() -> Response:
|
|||
# Same generic response for missing user, wrong password, or
|
||||
# deactivated account — no timing oracle for deactivation status.
|
||||
return Response("invalid credentials", status=401)
|
||||
login_user(user.id, user.password_changed_at)
|
||||
login_user(user.id)
|
||||
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
||||
next_target = request.form.get("next", "")
|
||||
return redirect(next_target if is_safe_next(next_target) else "/dashboard")
|
||||
|
|
|
|||
|
|
@ -1,93 +0,0 @@
|
|||
from flask import Blueprint, Response, redirect, render_template, request, session
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.auth import (
|
||||
current_user,
|
||||
hash_password,
|
||||
require_login,
|
||||
validate_new_password,
|
||||
verify_password,
|
||||
)
|
||||
from l4d2web.db import session_scope
|
||||
from l4d2web.models import User, now_utc
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
bp = Blueprint("profile", __name__)
|
||||
|
||||
|
||||
_ERROR_MESSAGES = {
|
||||
"fields_required": "Fill in all three fields.",
|
||||
"mismatch": "New password and confirmation do not match.",
|
||||
"too_short": "New password must be at least 8 characters.",
|
||||
"empty": "New password must not be empty.",
|
||||
"wrong_current": "Current password is incorrect.",
|
||||
}
|
||||
|
||||
|
||||
PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS = 60.0
|
||||
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS = 10
|
||||
PROFILE_PW_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
||||
|
||||
|
||||
def reset_profile_password_rate_limits() -> None:
|
||||
PROFILE_PW_ATTEMPTS_BY_IP.clear()
|
||||
|
||||
|
||||
def _redirect_with_error(error_key: str) -> Response:
|
||||
return redirect(f"/profile?error={error_key}")
|
||||
|
||||
|
||||
@bp.get("/profile")
|
||||
@require_login
|
||||
def profile_page() -> str:
|
||||
error_key = request.args.get("error", "")
|
||||
success = request.args.get("success") == "1"
|
||||
return render_template(
|
||||
"profile.html",
|
||||
error_message=_ERROR_MESSAGES.get(error_key, ""),
|
||||
success=success,
|
||||
)
|
||||
|
||||
|
||||
@bp.post("/profile/password")
|
||||
@require_login
|
||||
def profile_password_change() -> Response:
|
||||
remote_addr = request.remote_addr or "unknown"
|
||||
if check_rate_limit(
|
||||
PROFILE_PW_ATTEMPTS_BY_IP,
|
||||
remote_addr,
|
||||
window=PROFILE_PW_RATE_LIMIT_WINDOW_SECONDS,
|
||||
max_attempts=PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
):
|
||||
return Response("too many attempts", status=429)
|
||||
|
||||
current_password = request.form.get("current_password", "")
|
||||
new_password = request.form.get("new_password", "")
|
||||
confirm_new_password = request.form.get("confirm_new_password", "")
|
||||
|
||||
if not current_password or not new_password or not confirm_new_password:
|
||||
return _redirect_with_error("fields_required")
|
||||
if new_password != confirm_new_password:
|
||||
return _redirect_with_error("mismatch")
|
||||
policy_error = validate_new_password(new_password)
|
||||
if policy_error is not None:
|
||||
error_key = "empty" if policy_error == "password must not be empty" else "too_short"
|
||||
return _redirect_with_error(error_key)
|
||||
|
||||
actor = current_user()
|
||||
assert actor is not None # @require_login
|
||||
|
||||
with session_scope() as db:
|
||||
user = db.scalar(select(User).where(User.id == actor.id))
|
||||
if user is None or not verify_password(current_password, user.password_digest):
|
||||
return _redirect_with_error("wrong_current")
|
||||
user.password_digest = hash_password(new_password)
|
||||
# Strip tz so the marker matches what a subsequent DB read returns
|
||||
# (SQLite DateTime columns don't preserve tzinfo).
|
||||
user.password_changed_at = now_utc().replace(tzinfo=None)
|
||||
new_marker = user.password_changed_at.isoformat()
|
||||
|
||||
session["pw_changed_at"] = new_marker
|
||||
|
||||
return redirect("/profile?success=1")
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
import time
|
||||
|
||||
|
||||
def check_rate_limit(
|
||||
bucket: dict[str, list[float]],
|
||||
remote_addr: str,
|
||||
*,
|
||||
window: float,
|
||||
max_attempts: int,
|
||||
) -> bool:
|
||||
"""Return True if this request should be rate-limited (rejected).
|
||||
|
||||
The bucket is owned by the caller so each endpoint can keep an
|
||||
independent count and tests can reset state cleanly.
|
||||
"""
|
||||
now = time.time()
|
||||
attempts = bucket.setdefault(remote_addr, [])
|
||||
cutoff = now - window
|
||||
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
||||
if len(attempts) >= max_attempts:
|
||||
return True
|
||||
attempts.append(now)
|
||||
return False
|
||||
|
|
@ -24,7 +24,7 @@
|
|||
{% if g.user %}
|
||||
<nav class="account-nav" aria-label="Account navigation">
|
||||
{% if g.user.admin %}<a href="/admin">admin</a>{% endif %}
|
||||
<a class="muted" href="/profile">{{ g.user.username }}</a>
|
||||
<span class="muted">{{ g.user.username }}</span>
|
||||
<form method="post" action="/logout" class="inline-form">
|
||||
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||
<button class="link-button" type="submit">logout</button>
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Profile | left4me{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<section class="panel">
|
||||
<h1>Change password</h1>
|
||||
|
||||
{% if success %}
|
||||
<p class="muted">Password changed.</p>
|
||||
{% endif %}
|
||||
{% if error_message %}
|
||||
<p class="error">{{ error_message }}</p>
|
||||
{% endif %}
|
||||
|
||||
<form method="post" action="/profile/password" class="form">
|
||||
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
|
||||
<label>
|
||||
Current password
|
||||
<input type="password" name="current_password" autocomplete="current-password" required>
|
||||
</label>
|
||||
<label>
|
||||
New password
|
||||
<input type="password" name="new_password" autocomplete="new-password" required>
|
||||
</label>
|
||||
<label>
|
||||
Confirm new password
|
||||
<input type="password" name="confirm_new_password" autocomplete="new-password" required>
|
||||
</label>
|
||||
<button type="submit">Change password</button>
|
||||
</form>
|
||||
</section>
|
||||
{% endblock %}
|
||||
|
|
@ -25,12 +25,10 @@ def admin_client(tmp_path, monkeypatch):
|
|||
db.add_all([admin, second_admin])
|
||||
db.flush()
|
||||
admin_id = admin.id
|
||||
admin_marker = admin.password_changed_at.isoformat()
|
||||
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = admin_marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client, admin_id
|
||||
|
||||
|
|
@ -126,15 +124,11 @@ def test_deactivated_user_existing_session_invalidated(admin_client):
|
|||
"""An active session at the moment of deactivation stops working."""
|
||||
client, _ = admin_client
|
||||
target = _add_user("bob")
|
||||
with session_scope() as db:
|
||||
bob = db.scalar(select(User).where(User.id == target))
|
||||
bob_marker = bob.password_changed_at.isoformat()
|
||||
|
||||
# Forge a session for bob.
|
||||
bob_client = client.application.test_client()
|
||||
with bob_client.session_transaction() as sess:
|
||||
sess["user_id"] = target
|
||||
sess["pw_changed_at"] = bob_marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
# Sanity: bob can hit a logged-in route.
|
||||
|
|
|
|||
|
|
@ -134,75 +134,10 @@ def test_login_sets_session(client) -> None:
|
|||
assert sess.get("user_id") is not None
|
||||
|
||||
|
||||
def test_load_current_user_rejects_missing_marker(client) -> None:
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_load_current_user_rejects_stale_marker(client) -> None:
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
|
||||
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = stale.isoformat()
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_load_current_user_accepts_current_marker(client) -> None:
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("secret"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
marker = u.password_changed_at.isoformat()
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = marker
|
||||
|
||||
response = client.get("/dashboard")
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_login_stamps_password_changed_at_in_session(client) -> None:
|
||||
with session_scope() as session:
|
||||
session.add(User(username="alice", password_digest=hash_password("secret")))
|
||||
|
||||
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
||||
assert response.status_code == 302
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
marker = sess.get("pw_changed_at")
|
||||
assert marker is not None
|
||||
with session_scope() as session:
|
||||
user = session.query(User).filter_by(username="alice").one()
|
||||
assert marker == user.password_changed_at.isoformat()
|
||||
|
||||
|
||||
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
|
|
@ -215,19 +150,6 @@ def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> Non
|
|||
assert user.admin is True
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "at least 8" in result.output
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
|
|
@ -241,26 +163,10 @@ def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatc
|
|||
assert "password must not be empty" in result.output
|
||||
|
||||
|
||||
def test_validate_new_password_rejects_empty():
|
||||
from l4d2web.auth import validate_new_password
|
||||
assert validate_new_password("") == "password must not be empty"
|
||||
|
||||
|
||||
def test_validate_new_password_rejects_short():
|
||||
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
|
||||
assert MIN_PASSWORD_LENGTH == 8
|
||||
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
|
||||
|
||||
|
||||
def test_validate_new_password_accepts_min_length():
|
||||
from l4d2web.auth import validate_new_password
|
||||
assert validate_new_password("a" * 8) is None
|
||||
|
||||
|
||||
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
|
||||
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
||||
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secret")
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
with session_scope() as session:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import json
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
|
@ -32,7 +31,6 @@ def user_client(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -60,7 +58,6 @@ def linked_blueprint(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
return client, blueprint_id
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
from pathlib import Path
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
|
|
@ -67,7 +66,6 @@ def test_sse_resume_from_last_seq(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||
assert response.status_code == 200
|
||||
|
|
@ -78,7 +76,6 @@ def test_sse_replays_custom_job_log_events(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream?last_seq=5")
|
||||
text = response.get_data(as_text=True)
|
||||
|
|
@ -94,7 +91,6 @@ def test_sse_resumes_from_last_event_id_header(seeded_job_logs) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}/stream", headers={"Last-Event-ID": "6"})
|
||||
text = response.get_data(as_text=True)
|
||||
|
|
|
|||
|
|
@ -18,23 +18,3 @@ def test_create_user_and_blueprint(tmp_path, monkeypatch) -> None:
|
|||
|
||||
assert user.id is not None
|
||||
assert blueprint.id is not None
|
||||
|
||||
|
||||
def test_user_has_password_changed_at_default(tmp_path, monkeypatch):
|
||||
from datetime import UTC, datetime
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
||||
db_url = f"sqlite:///{tmp_path/'pw.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
|
||||
before = datetime.now(UTC).replace(tzinfo=None)
|
||||
with session_scope() as db:
|
||||
db.add(User(username="alice", password_digest=hash_password("secret")))
|
||||
with session_scope() as db:
|
||||
user = db.query(User).filter_by(username="alice").one()
|
||||
|
||||
assert user.password_changed_at is not None
|
||||
assert user.password_changed_at >= before
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""HTTP-level tests for the overlay 'Files' tree-fragment + download routes."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import io
|
||||
import os
|
||||
|
|
@ -35,7 +34,6 @@ def _client_for(app, user_id: int):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ from l4d2web.auth import hash_password
|
|||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
|
||||
from l4d2web.services.security import validate_overlay_ref
|
||||
from datetime import UTC, datetime
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -24,7 +23,6 @@ def admin_client(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -50,7 +48,6 @@ def user_client_with_overlay(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
@ -149,7 +146,6 @@ def test_two_users_can_have_workshop_overlay_with_same_name(tmp_path, monkeypatc
|
|||
c = app.test_client()
|
||||
with c.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return c
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import pytest
|
||||
from pathlib import Path
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
|
@ -36,7 +35,6 @@ def auth_client_with_server(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
return client
|
||||
|
||||
|
||||
|
|
@ -63,7 +61,6 @@ def user_client_and_other_blueprint(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = owner_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
return client, blueprint_id
|
||||
|
||||
|
|
@ -348,7 +345,6 @@ def test_admin_can_use_admin_pages(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
admin_page = client.get("/admin")
|
||||
assert admin_page.status_code == 200
|
||||
|
|
@ -384,7 +380,6 @@ def test_admin_can_view_other_users_job(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/jobs/{job_id}")
|
||||
|
||||
|
|
@ -407,7 +402,6 @@ def test_admin_can_enqueue_runtime_install_job(tmp_path, monkeypatch) -> None:
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
response = client.post("/admin/install", headers={"X-CSRF-Token": "test-token"})
|
||||
|
|
@ -463,7 +457,6 @@ def test_root_redirects_by_auth_state(tmp_path, monkeypatch) -> None:
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
logged_in_response = client.get("/")
|
||||
assert logged_in_response.status_code == 302
|
||||
|
|
@ -521,7 +514,6 @@ def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
|
|||
admin_client = app.test_client()
|
||||
with admin_client.session_transaction() as sess:
|
||||
sess["user_id"] = admin_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
with session_scope() as db:
|
||||
db.add(Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued"))
|
||||
|
|
@ -549,7 +541,6 @@ def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
|
|||
user_client = app.test_client()
|
||||
with user_client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
with session_scope() as db:
|
||||
job = Job(user_id=None, server_id=None, operation="refresh_workshop_items", state="queued")
|
||||
|
|
@ -726,7 +717,6 @@ def test_overlay_jobs_page_403_for_other_users_private_overlay(tmp_path, monkeyp
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = other_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
response = client.get(f"/overlays/{overlay_id}/jobs")
|
||||
assert response.status_code == 403
|
||||
|
|
|
|||
|
|
@ -1,258 +0,0 @@
|
|||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
from l4d2web.db import init_db, session_scope
|
||||
from l4d2web.models import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_and_user(tmp_path, monkeypatch):
|
||||
db_url = f"sqlite:///{tmp_path/'profile.db'}"
|
||||
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||
init_db()
|
||||
with session_scope() as db:
|
||||
u = User(username="alice", password_digest=hash_password("currentpass"))
|
||||
db.add(u)
|
||||
db.flush()
|
||||
uid = u.id
|
||||
marker = u.password_changed_at.isoformat()
|
||||
return app, uid, marker
|
||||
|
||||
|
||||
def _logged_in_client(app, uid, marker):
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = marker
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
||||
def test_profile_requires_login(app_and_user):
|
||||
app, _, _ = app_and_user
|
||||
response = app.test_client().get("/profile", follow_redirects=False)
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_profile_page_renders(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
response = client.get("/profile")
|
||||
assert response.status_code == 200
|
||||
body = response.get_data(as_text=True)
|
||||
assert "Change password" in body
|
||||
assert 'name="current_password"' in body
|
||||
assert 'name="new_password"' in body
|
||||
assert 'name="confirm_new_password"' in body
|
||||
|
||||
|
||||
def test_base_template_links_username_to_profile(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
response = client.get("/dashboard")
|
||||
body = response.get_data(as_text=True)
|
||||
assert 'href="/profile"' in body
|
||||
assert ">alice<" in body
|
||||
|
||||
|
||||
def _post_pw(client, **fields):
|
||||
payload = {"csrf_token": "test-token", **fields}
|
||||
return client.post("/profile/password", data=payload, follow_redirects=False)
|
||||
|
||||
|
||||
def _digest_of(username):
|
||||
with session_scope() as db:
|
||||
u = db.scalar(select(User).where(User.username == username))
|
||||
return u.password_digest
|
||||
|
||||
|
||||
def test_post_password_rejects_missing_csrf(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = client.post(
|
||||
"/profile/password",
|
||||
data={
|
||||
"current_password": "currentpass",
|
||||
"new_password": "newpass12",
|
||||
"confirm_new_password": "newpass12",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_requires_login(app_and_user):
|
||||
app, _, _ = app_and_user
|
||||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["csrf_token"] = "test-token"
|
||||
response = client.post(
|
||||
"/profile/password",
|
||||
data={
|
||||
"csrf_token": "test-token",
|
||||
"current_password": "x",
|
||||
"new_password": "y" * 8,
|
||||
"confirm_new_password": "y" * 8,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/login" in response.headers["Location"]
|
||||
|
||||
|
||||
def test_post_password_empty_fields_redirects_with_error(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=fields_required" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_mismatched_confirm(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass99",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=mismatch" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_too_short(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="short7x",
|
||||
confirm_new_password="short7x",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=too_short" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_wrong_current(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before = _digest_of("alice")
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert "/profile?error=wrong_current" in response.headers["Location"]
|
||||
assert _digest_of("alice") == before
|
||||
|
||||
|
||||
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
before_digest = _digest_of("alice")
|
||||
with session_scope() as db:
|
||||
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
|
||||
|
||||
response = _post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/profile?success=1")
|
||||
|
||||
after_digest = _digest_of("alice")
|
||||
assert after_digest != before_digest
|
||||
with session_scope() as db:
|
||||
u = db.scalar(select(User).where(User.id == uid))
|
||||
assert u.password_changed_at > before_marker
|
||||
new_marker = u.password_changed_at.isoformat()
|
||||
|
||||
follow = client.get("/dashboard")
|
||||
assert follow.status_code == 200
|
||||
|
||||
with client.session_transaction() as sess:
|
||||
assert sess["pw_changed_at"] == new_marker
|
||||
|
||||
|
||||
def test_other_session_dies_after_password_change(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
primary = _logged_in_client(app, uid, marker)
|
||||
other = _logged_in_client(app, uid, marker)
|
||||
|
||||
pre = other.get("/dashboard")
|
||||
assert pre.status_code == 200
|
||||
|
||||
response = _post_pw(
|
||||
primary,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert response.status_code == 302
|
||||
|
||||
post = other.get("/dashboard", follow_redirects=False)
|
||||
assert post.status_code == 302
|
||||
assert "/login" in post.headers["Location"]
|
||||
|
||||
|
||||
def test_new_password_works_for_login(app_and_user):
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
_post_pw(
|
||||
client,
|
||||
current_password="currentpass",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
fresh = app.test_client()
|
||||
response = fresh.post(
|
||||
"/login",
|
||||
data={"username": "alice", "password": "newpass12"},
|
||||
)
|
||||
assert response.status_code == 302
|
||||
assert response.headers["Location"].endswith("/dashboard")
|
||||
|
||||
|
||||
def test_post_password_rate_limited(app_and_user):
|
||||
from l4d2web.routes.profile_routes import (
|
||||
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
||||
reset_profile_password_rate_limits,
|
||||
)
|
||||
reset_profile_password_rate_limits()
|
||||
|
||||
app, uid, marker = app_and_user
|
||||
client = _logged_in_client(app, uid, marker)
|
||||
|
||||
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
|
||||
_post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
|
||||
blocked = _post_pw(
|
||||
client,
|
||||
current_password="WRONG",
|
||||
new_password="newpass12",
|
||||
confirm_new_password="newpass12",
|
||||
)
|
||||
assert blocked.status_code == 429
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
import time
|
||||
|
||||
from l4d2web.services.rate_limit import check_rate_limit
|
||||
|
||||
|
||||
def test_under_threshold_allows():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(3):
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||
|
||||
|
||||
def test_at_threshold_blocks():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is False
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5) is True
|
||||
|
||||
|
||||
def test_other_ips_independent():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
check_rate_limit(bucket, "1.2.3.4", window=60.0, max_attempts=5)
|
||||
assert check_rate_limit(bucket, "5.6.7.8", window=60.0, max_attempts=5) is False
|
||||
|
||||
|
||||
def test_old_attempts_expire():
|
||||
bucket: dict[str, list[float]] = {}
|
||||
for _ in range(5):
|
||||
check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5)
|
||||
time.sleep(0.1)
|
||||
assert check_rate_limit(bucket, "1.2.3.4", window=0.05, max_attempts=5) is False
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
"""Routes for type='script' overlays: create, /script (update body),
|
||||
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
|
@ -53,7 +52,6 @@ def _client_for(app, user_id: int):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return client
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import json
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -33,7 +32,6 @@ def user_client_with_blueprints(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = payload["user_id"]
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
return client, payload
|
||||
|
|
@ -54,7 +52,6 @@ def test_servers_page_without_blueprints_shows_create_blueprint_cta(tmp_path, mo
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
response = client.get("/servers")
|
||||
|
|
@ -238,7 +235,6 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = alice_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
alice_resp = client.post(
|
||||
"/servers",
|
||||
|
|
@ -249,7 +245,6 @@ def test_create_server_allows_same_name_for_different_users(tmp_path, monkeypatc
|
|||
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = bob_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
bob_resp = client.post(
|
||||
"/servers",
|
||||
|
|
@ -323,7 +318,6 @@ def test_create_server_returns_409_when_port_range_exhausted(tmp_path, monkeypat
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
|
||||
first = client.post(
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import pytest
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from l4d2web.app import create_app
|
||||
from l4d2web.auth import hash_password
|
||||
|
|
@ -33,7 +32,6 @@ def owner_client_with_server(tmp_path, monkeypatch):
|
|||
client = app.test_client()
|
||||
with client.session_transaction() as sess:
|
||||
sess["user_id"] = user_id
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
return client, server_id
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""Tests for the workshop overlay routes (add items, remove items, build,
|
||||
admin refresh)."""
|
||||
from __future__ import annotations
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from typing import Iterable
|
||||
from unittest.mock import patch
|
||||
|
|
@ -55,7 +54,6 @@ def env_user(tmp_path, monkeypatch):
|
|||
c = app.test_client()
|
||||
with c.session_transaction() as sess:
|
||||
sess["user_id"] = uid
|
||||
sess["pw_changed_at"] = datetime.now(UTC).isoformat()
|
||||
sess["csrf_token"] = "test-token"
|
||||
return c
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue