left4me/l4d2web/l4d2web/auth.py
mwiegand 18113637e9
refactor(datetime): introduce UtcDateTime, remove naive-strip workarounds
Adds a UtcDateTime TypeDecorator (models.py) that enforces aware-UTC on
write and stamps tzinfo=UTC on read. Replaces 26 DateTime column
declarations. Removes 5 production sites that defensively stripped tzinfo
to match SQLite's lossy round-trip. auth.py now coerces legacy session
cookies upward (stamp UTC on parsed naive marker) instead of stripping
live aware markers downward.

The change is Python-side only: UtcDateTime.impl = DateTime, so DDL and
emitted SQL are unchanged. No Alembic migration needed.

Adds 2 unit tests in test_models.py pinning the decorator's contract
independently of the column declarations.

The three deliberately-naive test_timeago.py fixtures (lines 67, 73, 113)
remain naive on purpose -- they exercise _ensure_utc's normalize-up path
at the public filter boundary, which stays as belt-and-braces defense.

See docs/superpowers/specs/2026-05-16-tz-aware-datetime-design.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 11:59:29 +02:00

144 lines
4 KiB
Python

from datetime import UTC, datetime
from functools import wraps
from typing import Callable, TypeVar
from urllib.parse import quote, unquote
from flask import abort, g, redirect, request, session
from sqlalchemy import select
from werkzeug.security import check_password_hash, generate_password_hash
from l4d2web.db import session_scope
from l4d2web.models import User
F = TypeVar("F", bound=Callable)
def hash_password(raw: str) -> str:
return generate_password_hash(raw)
def verify_password(raw: str, digest: str) -> bool:
return check_password_hash(digest, raw)
MIN_PASSWORD_LENGTH = 8
def validate_new_password(raw: str) -> str | None:
if raw == "":
return "password must not be empty"
if len(raw) < MIN_PASSWORD_LENGTH:
return f"password must be at least {MIN_PASSWORD_LENGTH} characters"
return None
def load_current_user() -> None:
user_id = session.get("user_id")
if user_id is None:
g.user = None
return
with session_scope() as db:
user = db.scalar(select(User).where(User.id == int(user_id)))
if user is None or not user.active:
g.user = None
return
marker = session.get("pw_changed_at")
if marker is None:
g.user = None
return
try:
marker_dt = datetime.fromisoformat(marker)
except ValueError:
g.user = None
return
# Legacy sessions minted before the UtcDateTime migration carry naive
# ISO markers; stamp UTC so the comparison against the now-aware DB
# value works. Cheap permanent defense; no reliable signal for "all
# naive cookies have expired."
if marker_dt.tzinfo is None:
marker_dt = marker_dt.replace(tzinfo=UTC)
if marker_dt < user.password_changed_at:
g.user = None
return
# Role change since login → reject. Prevents a demoted admin from
# keeping admin until their next password change. Absence of the marker
# means a pre-upgrade session; let it through and the next login will
# stamp it.
session_admin = session.get("admin")
if session_admin is not None and bool(session_admin) != bool(user.admin):
g.user = None
return
g.user = user
def current_user() -> User | None:
return getattr(g, "user", None)
def login_user(user_id: int, password_changed_at, admin: bool) -> None:
# Drop any pre-login session state so a fixated cookie value can't smuggle
# data past the login boundary. The next request's csrf_protect hook will
# mint a fresh csrf_token.
session.clear()
session["user_id"] = user_id
session["pw_changed_at"] = password_changed_at.isoformat()
session["admin"] = bool(admin)
def logout_user() -> None:
session.clear()
def is_safe_next(target: str | None) -> bool:
if not target:
return False
if not target.startswith("/"):
return False
if target.startswith("//"):
return False
if "://" in target:
return False
if "\\" in target:
return False
decoded_target = unquote(target)
if decoded_target.startswith("//"):
return False
if "://" in decoded_target:
return False
if "\\" in decoded_target:
return False
return True
def login_redirect_for_current_request():
target = request.full_path.rstrip("?")
if is_safe_next(target):
return redirect(f"/login?next={quote(target, safe='/')}")
return redirect("/login")
def require_login(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
if current_user() is None:
return login_redirect_for_current_request()
return func(*args, **kwargs)
return wrapper # type: ignore[return-value]
def require_admin(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
user = current_user()
if user is None:
return login_redirect_for_current_request()
if not user.admin:
abort(403)
return func(*args, **kwargs)
return wrapper # type: ignore[return-value]