left4me/l4d2web/l4d2web/routes/server_routes.py
mwiegand 18113637e9
refactor(datetime): introduce UtcDateTime, remove naive-strip workarounds
Adds a UtcDateTime TypeDecorator (models.py) that enforces aware-UTC on
write and stamps tzinfo=UTC on read. Replaces 26 DateTime column
declarations. Removes 5 production sites that defensively stripped tzinfo
to match SQLite's lossy round-trip. auth.py now coerces legacy session
cookies upward (stamp UTC on parsed naive marker) instead of stripping
live aware markers downward.

The change is Python-side only: UtcDateTime.impl = DateTime, so DDL and
emitted SQL are unchanged. No Alembic migration needed.

Adds 2 unit tests in test_models.py pinning the decorator's contract
independently of the column declarations.

The three deliberately-naive test_timeago.py fixtures (lines 67, 73, 113)
remain naive on purpose -- they exercise _ensure_utc's normalize-up path
at the public filter boundary, which stays as belt-and-braces defense.

See docs/superpowers/specs/2026-05-16-tz-aware-datetime-design.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 11:59:29 +02:00

274 lines
8.9 KiB
Python

import secrets
from datetime import UTC, datetime, timedelta
from flask import Blueprint, Response, current_app, jsonify, redirect, render_template, request
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from l4d2web.auth import current_user, require_login
from l4d2web.db import session_scope
from l4d2web.models import Blueprint as BlueprintModel
from l4d2web.models import Job, Server, ServerLiveState, ServerPlayerSession, SteamUserProfile
bp = Blueprint("server", __name__)
_NAME_MAX_LENGTH = 128
def _validate_display_name(raw: object) -> str:
if not isinstance(raw, str):
raise ValueError("name must be a string")
cleaned = raw.strip()
if not cleaned:
raise ValueError("name must not be empty")
if len(cleaned) > _NAME_MAX_LENGTH:
raise ValueError("name too long")
return cleaned
def _allocate_next_port(db) -> int | None:
start = int(current_app.config["PORT_RANGE_START"])
end = int(current_app.config["PORT_RANGE_END"])
used = set(
db.scalars(
select(Server.port).where(Server.port >= start, Server.port <= end)
).all()
)
for port in range(start, end + 1):
if port not in used:
return port
return None
def _resolve_port(payload, db) -> tuple[int | None, Response | None]:
raw = payload.get("port")
if raw is None or (isinstance(raw, str) and raw.strip() == ""):
port = _allocate_next_port(db)
if port is None:
return None, Response("no free port available", status=409)
return port, None
try:
port = int(raw)
except (TypeError, ValueError):
return None, Response("invalid port", status=400)
if not 1 <= port <= 65535:
return None, Response("invalid port", status=400)
return port, None
@bp.post("/servers")
@require_login
def create_server() -> Response:
user = current_user()
assert user is not None
json_response = request.is_json
payload = request.get_json(silent=True) if json_response else request.form
try:
name = _validate_display_name(payload["name"])
except (KeyError, TypeError, ValueError):
return Response("invalid server name", status=400)
with session_scope() as db:
blueprint = db.scalar(
select(BlueprintModel).where(
BlueprintModel.id == int(payload["blueprint_id"]),
BlueprintModel.user_id == user.id,
)
)
if blueprint is None:
return Response("blueprint not found", status=404)
port, error = _resolve_port(payload, db)
if error is not None:
return error
server = Server(
user_id=user.id,
blueprint_id=blueprint.id,
name=name,
port=port,
desired_state="stopped",
actual_state="unknown",
last_error="",
rcon_password=secrets.token_urlsafe(32),
)
db.add(server)
try:
db.flush()
except IntegrityError as exc:
db.rollback()
detail = str(exc.orig) if exc.orig is not None else str(exc)
if "servers.name" in detail:
return Response("name already in use", status=409)
return Response("port already in use", status=409)
server_id = server.id
if json_response:
return jsonify({"id": server_id}), 201
return redirect(f"/servers/{server_id}")
@bp.post("/servers/<int:server_id>")
@require_login
def update_server_form(server_id: int) -> Response:
user = current_user()
assert user is not None
try:
name = _validate_display_name(request.form.get("name", ""))
except ValueError:
return Response("invalid server name", status=400)
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
server.name = name
if "hostname" in request.form:
server.hostname = request.form["hostname"]
try:
db.flush()
except IntegrityError as exc:
db.rollback()
detail = str(exc.orig) if exc.orig is not None else str(exc)
if "servers" in detail and "name" in detail:
return Response("name already in use", status=409)
raise
return redirect(f"/servers/{server_id}")
@bp.patch("/servers/<int:server_id>")
@require_login
def update_server(server_id: int) -> Response:
user = current_user()
assert user is not None
payload = request.get_json(silent=True) or {}
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
blueprint = db.scalar(
select(BlueprintModel).where(
BlueprintModel.id == int(payload["blueprint_id"]),
BlueprintModel.user_id == user.id,
)
)
if blueprint is None:
return Response("blueprint not found", status=404)
server.blueprint_id = blueprint.id
return jsonify({"id": server_id}), 200
LIFECYCLE_OPERATIONS = {"initialize", "start", "stop", "delete", "reset"}
@bp.post("/servers/<int:server_id>/<operation>")
@require_login
def enqueue_server_operation(server_id: int, operation: str) -> Response:
user = current_user()
assert user is not None
if operation not in LIFECYCLE_OPERATIONS:
return Response(status=404)
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
db.add(Job(user_id=user.id, server_id=server.id, operation=operation, state="queued"))
if operation == "start":
server.desired_state = "running"
if operation in {"stop", "delete", "reset"}:
server.desired_state = "stopped"
if operation == "delete":
return redirect("/servers")
return redirect(f"/servers/{server_id}")
@bp.get("/servers/<int:server_id>/live-state")
@require_login
def live_state_fragment(server_id: int) -> Response:
user = current_user()
assert user is not None
with session_scope() as db:
server = db.scalar(select(Server).where(
Server.id == server_id, Server.user_id == user.id,
))
if server is None:
return Response(status=404)
stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
cutoff = datetime.now(UTC) - timedelta(seconds=stale_seconds)
latest = db.scalar(
select(ServerLiveState)
.where(ServerLiveState.server_id == server.id)
.order_by(ServerLiveState.started_at.desc())
.limit(1)
)
current_rows = db.execute(
select(ServerPlayerSession, SteamUserProfile)
.outerjoin(
SteamUserProfile,
SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
)
.where(
ServerPlayerSession.server_id == server.id,
ServerPlayerSession.left_at.is_(None),
)
.order_by(ServerPlayerSession.joined_at)
).all()
current_ids = [r[0].steam_id_64 for r in current_rows]
recent_cutoff = datetime.now(UTC) - timedelta(
days=current_app.config.get("LIVE_STATE_HISTORY_DAYS", 30)
)
recent_rows = db.execute(
select(
ServerPlayerSession.steam_id_64,
func.max(ServerPlayerSession.left_at).label("last_seen"),
ServerPlayerSession.name_at_join,
SteamUserProfile.persona_name,
SteamUserProfile.avatar_url,
)
.outerjoin(
SteamUserProfile,
SteamUserProfile.steam_id_64 == ServerPlayerSession.steam_id_64,
)
.where(
ServerPlayerSession.server_id == server.id,
ServerPlayerSession.left_at.is_not(None),
ServerPlayerSession.left_at >= recent_cutoff,
~ServerPlayerSession.steam_id_64.in_(current_ids) if current_ids else True,
)
.group_by(
ServerPlayerSession.steam_id_64,
SteamUserProfile.persona_name,
SteamUserProfile.avatar_url,
ServerPlayerSession.name_at_join,
)
.order_by(func.max(ServerPlayerSession.left_at).desc())
.limit(20)
).all()
return render_template(
"_live_state.html",
server=server,
snapshot=latest,
snapshot_fresh=(latest is not None and latest.last_seen_at >= cutoff),
current_players=current_rows,
recent_players=recent_rows,
poll_seconds=max(1, int(current_app.config.get("LIVE_STATE_POLL_SECONDS", 5))),
)