left4me/l4d2web/routes/page_routes.py
mwiegand bcea450e98
admin: deactivate/activate/delete endpoints for /admin/users
Three new POST endpoints on the existing admin blueprint, all guarded
by @require_admin and CSRF (per the global before_request hook):

  /admin/users/<id>/deactivate  flips active=False (refuses self)
  /admin/users/<id>/activate    flips active=True
  /admin/users/<id>/delete      hard delete with safeties:
    - refuses self-delete
    - refuses delete-of-the-last-admin
    - refuses if the user owns Servers, Blueprints, or custom
      Overlays (operator deletes those first via existing UIs)
    - nulls out Job.user_id (jobs stay as audit trail; FK is nullable)

admin_users.html grows an Active column + an Actions column with the
appropriate button per row (none for self, Deactivate/Activate
toggle, Delete-with-confirmation modal). Modal pattern mirrors
blueprint_detail.html (same modal-close/modal-open data attrs,
csrf_token hidden field).

Refusal responses are 409 with a plain-text body (matches the
blueprint-in-use refusal at blueprint_routes.py:182). No flash
infrastructure introduced; consistent with the rest of the codebase.

All 367 existing tests still pass.
2026-05-10 21:15:52 +02:00

559 lines
19 KiB
Python

import json
from flask import Blueprint, Response, redirect, render_template, request
from sqlalchemy import func, select, update
from l4d2web.auth import current_user, require_admin, require_login
from l4d2web.db import session_scope
from l4d2web.models import Blueprint as BlueprintModel
from l4d2web.models import (
BlueprintOverlay,
Job,
Overlay,
OverlayWorkshopItem,
Server,
User,
WorkshopItem,
)
from l4d2web.services.overlay_files import (
list_directory,
safe_resolve_for_listing,
safe_resolve_for_server_listing,
)
bp = Blueprint("pages", __name__)
@bp.get("/dashboard")
@require_login
def dashboard() -> str:
return render_template("dashboard.html")
@bp.get("/admin")
@require_admin
def admin_home() -> str:
return render_template("admin.html")
@bp.post("/admin/install")
@require_admin
def enqueue_runtime_install() -> Response:
user = current_user()
assert user is not None
with session_scope() as db:
db.add(Job(user_id=user.id, server_id=None, operation="install", state="queued"))
return redirect("/admin/jobs")
@bp.get("/admin/users")
@require_admin
def admin_users() -> str:
with session_scope() as db:
users = db.scalars(select(User).order_by(User.username)).all()
return render_template("admin_users.html", users=users)
@bp.post("/admin/users/<int:user_id>/deactivate")
@require_admin
def admin_users_deactivate(user_id: int) -> Response:
actor = current_user()
assert actor is not None
if actor.id == user_id:
return Response("cannot deactivate yourself", status=409)
with session_scope() as db:
target = db.scalar(select(User).where(User.id == user_id))
if target is None:
return Response(status=404)
target.active = False
return redirect("/admin/users")
@bp.post("/admin/users/<int:user_id>/activate")
@require_admin
def admin_users_activate(user_id: int) -> Response:
with session_scope() as db:
target = db.scalar(select(User).where(User.id == user_id))
if target is None:
return Response(status=404)
target.active = True
return redirect("/admin/users")
@bp.post("/admin/users/<int:user_id>/delete")
@require_admin
def admin_users_delete(user_id: int) -> Response:
actor = current_user()
assert actor is not None
if actor.id == user_id:
return Response("cannot delete yourself", status=409)
with session_scope() as db:
target = db.scalar(select(User).where(User.id == user_id))
if target is None:
return Response(status=404)
if target.admin:
other_admins = db.scalar(
select(func.count(User.id)).where(User.admin.is_(True), User.id != user_id)
) or 0
if other_admins == 0:
return Response("cannot delete the last admin", status=409)
owned_servers = db.scalar(
select(func.count(Server.id)).where(Server.user_id == user_id)
) or 0
owned_blueprints = db.scalar(
select(func.count(BlueprintModel.id)).where(BlueprintModel.user_id == user_id)
) or 0
owned_overlays = db.scalar(
select(func.count(Overlay.id)).where(Overlay.user_id == user_id)
) or 0
if owned_servers or owned_blueprints or owned_overlays:
return Response(
f"user owns content: {owned_servers} server(s), "
f"{owned_blueprints} blueprint(s), {owned_overlays} overlay(s) — "
"delete those first",
status=409,
)
# Job rows have nullable user_id — keep them as audit trail with the FK nulled out.
db.execute(update(Job).where(Job.user_id == user_id).values(user_id=None))
db.delete(target)
return redirect("/admin/users")
@bp.get("/admin/jobs")
@require_admin
def admin_jobs() -> str:
with session_scope() as db:
rows = db.execute(
select(Job, User, Server)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.order_by(Job.created_at.desc())
).all()
return render_template("admin_jobs.html", rows=rows)
@bp.get("/servers")
@require_login
def servers_page() -> str:
user = current_user()
assert user is not None
with session_scope() as db:
rows = db.execute(
select(Server, BlueprintModel)
.join(BlueprintModel, BlueprintModel.id == Server.blueprint_id)
.where(Server.user_id == user.id)
.order_by(Server.name)
).all()
blueprints = db.scalars(
select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name)
).all()
prefill_blueprint_id: int | None = None
raw_prefill = request.args.get("blueprint_id")
if raw_prefill:
try:
candidate = int(raw_prefill)
except ValueError:
candidate = None
if candidate is not None and any(b.id == candidate for b in blueprints):
prefill_blueprint_id = candidate
return render_template(
"servers.html",
rows=rows,
blueprints=blueprints,
prefill_blueprint_id=prefill_blueprint_id,
)
_OPERATION_GERUND = {
"start": "starting",
"stop": "stopping",
"reset": "resetting",
"delete": "deleting",
"initialize": "initializing",
}
_TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
def _build_server_actions_context(db, server) -> dict:
from l4d2web.services.timeago import humanize_delta
latest_job = db.scalar(
select(Job)
.where(Job.server_id == server.id)
.order_by(Job.created_at.desc())
.limit(1)
)
if latest_job is not None:
db.expunge(latest_job)
actual_state = server.actual_state
desired_state = server.desired_state
active_operation = (
latest_job.operation
if latest_job is not None and latest_job.state not in _TERMINAL_JOB_STATES
else None
)
has_active_job = active_operation is not None
if has_active_job:
display_state = _OPERATION_GERUND.get(active_operation, active_operation) + ""
state_class = "state-transient"
elif actual_state == "running":
display_state = "running"
state_class = "state-running"
elif actual_state == "stopped":
display_state = "stopped"
state_class = "state-stopped"
else:
display_state = actual_state or "unknown"
state_class = "state-unknown"
visible_buttons: list[str] = []
if not has_active_job:
if actual_state == "running":
visible_buttons.append("stop")
else:
visible_buttons.append("start")
visible_buttons.append("reset")
drift = (not has_active_job) and desired_state != actual_state
latest_job_phrase: str | None = None
latest_job_when: str | None = None
latest_job_is_running = False
if latest_job is not None:
if latest_job.state in _TERMINAL_JOB_STATES:
latest_job_phrase = f"{latest_job.operation} {latest_job.state}"
ref_time = latest_job.finished_at or latest_job.created_at
else:
latest_job_phrase = _OPERATION_GERUND.get(latest_job.operation, latest_job.operation)
latest_job_is_running = True
ref_time = latest_job.started_at or latest_job.created_at
latest_job_when = humanize_delta(ref_time)
return {
"display_state": display_state,
"state_class": state_class,
"visible_buttons": visible_buttons,
"drift": drift,
"latest_job": latest_job,
"latest_job_phrase": latest_job_phrase,
"latest_job_when": latest_job_when,
"latest_job_is_running": latest_job_is_running,
}
@bp.get("/servers/<int:server_id>")
@require_login
def server_detail(server_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == server.blueprint_id))
ctx = _build_server_actions_context(db, server)
connect_host = request.host.split(":")[0]
file_tree_root_entries, file_tree_truncated_count = _root_server_file_tree(server_id)
return render_template(
"server_detail.html",
server=server,
blueprint=blueprint,
connect_host=connect_host,
file_tree_root_entries=file_tree_root_entries,
file_tree_truncated=file_tree_truncated_count > 0
if file_tree_root_entries is not None
else False,
file_tree_truncated_count=file_tree_truncated_count,
**ctx,
)
def _root_server_file_tree(server_id: int) -> tuple[list[dict] | None, int]:
"""Root listing of `runtime/<server_id>/merged`. Returns (None, 0) when
the merged dir doesn't exist yet (server never started or just reset)."""
try:
merged_root = safe_resolve_for_server_listing(server_id, "")
except ValueError:
return None, 0
if merged_root is None:
return None, 0
entries, truncated_count = list_directory(merged_root, merged_root)
return entries, truncated_count
@bp.get("/servers/<int:server_id>/actions")
@require_login
def server_actions_fragment(server_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
ctx = _build_server_actions_context(db, server)
return render_template("_server_actions.html", server=server, **ctx)
@bp.get("/servers/<int:server_id>/jobs")
@require_login
def server_jobs_page(server_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
if server is None:
return Response(status=404)
rows = db.execute(
select(Job, User, Server)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.server_id == server.id)
.order_by(Job.created_at.desc())
).all()
return render_template("server_jobs.html", server=server, rows=rows)
@bp.get("/overlays")
@require_login
def overlays() -> str:
user = current_user()
assert user is not None
with session_scope() as db:
query = select(Overlay).order_by(Overlay.name)
if not user.admin:
query = query.where(
Overlay.user_id.is_(None) | (Overlay.user_id == user.id)
)
overlays = db.scalars(query).all()
return render_template("overlays.html", overlays=overlays)
@bp.get("/overlays/<int:overlay_id>/jobs")
@require_login
def overlay_jobs_page(overlay_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
if overlay is None:
return Response(status=404)
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
return Response(status=403)
rows = db.execute(
select(Job, User, Server)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.operation == "build_overlay", Job.overlay_id == overlay.id)
.order_by(Job.created_at.desc())
).all()
return render_template("overlay_jobs.html", overlay=overlay, rows=rows)
_BUILD_STATE_LABELS = {
"ok": ("ok", "state-running"),
"failed": ("failed", "state-stopped"),
"": ("never built", "state-unknown"),
}
def _build_overlay_build_status_context(db, overlay) -> dict:
from l4d2web.services.timeago import humanize_delta
latest_build = db.scalar(
select(Job)
.where(Job.operation == "build_overlay", Job.overlay_id == overlay.id)
.order_by(Job.created_at.desc())
.limit(1)
)
if latest_build is not None:
db.expunge(latest_build)
is_running = (
latest_build is not None and latest_build.state not in _TERMINAL_JOB_STATES
)
if is_running:
build_state_label = "building…"
build_state_class = "state-transient"
else:
build_state_label, build_state_class = _BUILD_STATE_LABELS.get(
overlay.last_build_status or "", _BUILD_STATE_LABELS[""]
)
latest_build_phrase: str | None = None
latest_build_when: str | None = None
if latest_build is not None:
if latest_build.state in _TERMINAL_JOB_STATES:
latest_build_phrase = f"{latest_build.operation} {latest_build.state}"
ref_time = latest_build.finished_at or latest_build.created_at
else:
latest_build_phrase = "building"
ref_time = latest_build.started_at or latest_build.created_at
latest_build_when = humanize_delta(ref_time)
return {
"latest_build": latest_build,
"latest_build_is_running": is_running,
"latest_build_phrase": latest_build_phrase,
"latest_build_when": latest_build_when,
"build_state_label": build_state_label,
"build_state_class": build_state_class,
}
@bp.get("/overlays/<int:overlay_id>")
@require_login
def overlay_detail(overlay_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
if overlay is None:
return Response(status=404)
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
return Response(status=403)
using_blueprints_query = (
select(BlueprintModel)
.join(BlueprintOverlay, BlueprintOverlay.blueprint_id == BlueprintModel.id)
.where(BlueprintOverlay.overlay_id == overlay.id)
.order_by(BlueprintModel.name)
)
if not user.admin:
using_blueprints_query = using_blueprints_query.where(BlueprintModel.user_id == user.id)
using_blueprints = db.scalars(using_blueprints_query).all()
workshop_items = []
if overlay.type == "workshop":
workshop_items = db.scalars(
select(WorkshopItem)
.join(
OverlayWorkshopItem,
OverlayWorkshopItem.workshop_item_id == WorkshopItem.id,
)
.where(OverlayWorkshopItem.overlay_id == overlay.id)
.order_by(WorkshopItem.created_at)
).all()
build_ctx = _build_overlay_build_status_context(db, overlay)
file_tree_root_entries, file_tree_truncated_count = _root_file_tree(overlay)
return render_template(
"overlay_detail.html",
overlay=overlay,
using_blueprints=using_blueprints,
workshop_items=workshop_items,
file_tree_root_entries=file_tree_root_entries,
file_tree_truncated=file_tree_truncated_count > 0
if file_tree_root_entries is not None
else False,
file_tree_truncated_count=file_tree_truncated_count,
**build_ctx,
)
@bp.get("/overlays/<int:overlay_id>/build-status")
@require_login
def overlay_build_status_fragment(overlay_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
if overlay is None:
return Response(status=404)
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
return Response(status=403)
ctx = _build_overlay_build_status_context(db, overlay)
return render_template("_overlay_build_status.html", overlay=overlay, **ctx)
def _root_file_tree(overlay: Overlay) -> tuple[list[dict] | None, int]:
"""Return (entries, truncated_count) for the overlay's runtime directory,
or (None, 0) if the directory doesn't exist or the path is unresolvable
(e.g. legacy absolute `overlay.path` values that pre-date the current
`path == str(id)` convention)."""
try:
overlay_root = safe_resolve_for_listing(overlay.path, "")
except ValueError:
return None, 0
if not overlay_root.is_dir():
return None, 0
entries, truncated_count = list_directory(overlay_root, overlay_root)
return entries, truncated_count
@bp.get("/blueprints")
@require_login
def blueprints_page() -> str:
user = current_user()
assert user is not None
with session_scope() as db:
blueprints = db.scalars(
select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name)
).all()
return render_template("blueprints.html", blueprints=blueprints)
@bp.get("/blueprints/<int:blueprint_id>")
@require_login
def blueprint_page(blueprint_id: int):
user = current_user()
assert user is not None
with session_scope() as db:
blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == blueprint_id))
if blueprint is None:
return Response(status=404)
if blueprint.user_id != user.id:
return Response(status=403)
selected_overlays = db.scalars(
select(Overlay)
.join(BlueprintOverlay, BlueprintOverlay.overlay_id == Overlay.id)
.where(BlueprintOverlay.blueprint_id == blueprint.id)
.order_by(BlueprintOverlay.position)
).all()
position_rows = db.execute(
select(BlueprintOverlay.overlay_id, BlueprintOverlay.position)
.where(BlueprintOverlay.blueprint_id == blueprint.id)
).all()
expose_rows = db.execute(
select(BlueprintOverlay.overlay_id, BlueprintOverlay.expose_server_cfg)
.where(BlueprintOverlay.blueprint_id == blueprint.id)
).all()
all_overlays = db.scalars(
select(Overlay)
.where(Overlay.user_id.is_(None) | (Overlay.user_id == user.id))
.order_by(Overlay.name)
).all()
overlay_positions = {overlay_id: position + 1 for overlay_id, position in position_rows}
overlay_expose_state = {overlay_id: bool(expose) for overlay_id, expose in expose_rows}
selected_ids = {overlay.id for overlay in selected_overlays}
available_overlays = [overlay for overlay in all_overlays if overlay.id not in selected_ids]
return render_template(
"blueprint_detail.html",
blueprint=blueprint,
selected_overlays=selected_overlays,
available_overlays=available_overlays,
all_overlays=all_overlays,
selected_overlay_ids=selected_ids,
overlay_positions=overlay_positions,
overlay_expose_state=overlay_expose_state,
arguments=json.loads(blueprint.arguments),
config_lines=json.loads(blueprint.config),
)