import json from flask import Blueprint, Response, redirect, render_template, request from sqlalchemy import select from l4d2web.auth import current_user, require_admin, require_login from l4d2web.db import session_scope from l4d2web.models import Blueprint as BlueprintModel from l4d2web.models import ( BlueprintOverlay, Job, Overlay, OverlayWorkshopItem, Server, User, WorkshopItem, ) from l4d2web.services.overlay_files import ( list_directory, safe_resolve_for_listing, ) bp = Blueprint("pages", __name__) @bp.get("/dashboard") @require_login def dashboard() -> str: return render_template("dashboard.html") @bp.get("/admin") @require_admin def admin_home() -> str: return render_template("admin.html") @bp.post("/admin/install") @require_admin def enqueue_runtime_install() -> Response: user = current_user() assert user is not None with session_scope() as db: db.add(Job(user_id=user.id, server_id=None, operation="install", state="queued")) return redirect("/admin/jobs") @bp.get("/admin/users") @require_admin def admin_users() -> str: with session_scope() as db: users = db.scalars(select(User).order_by(User.username)).all() return render_template("admin_users.html", users=users) @bp.get("/admin/jobs") @require_admin def admin_jobs() -> str: with session_scope() as db: rows = db.execute( select(Job, User, Server) .outerjoin(User, User.id == Job.user_id) .outerjoin(Server, Server.id == Job.server_id) .order_by(Job.created_at.desc()) ).all() return render_template("admin_jobs.html", rows=rows) @bp.get("/servers") @require_login def servers_page() -> str: user = current_user() assert user is not None with session_scope() as db: rows = db.execute( select(Server, BlueprintModel) .join(BlueprintModel, BlueprintModel.id == Server.blueprint_id) .where(Server.user_id == user.id) .order_by(Server.name) ).all() blueprints = db.scalars( select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name) ).all() prefill_blueprint_id: int | None = None raw_prefill = request.args.get("blueprint_id") if raw_prefill: try: candidate = int(raw_prefill) except ValueError: candidate = None if candidate is not None and any(b.id == candidate for b in blueprints): prefill_blueprint_id = candidate return render_template( "servers.html", rows=rows, blueprints=blueprints, prefill_blueprint_id=prefill_blueprint_id, ) _OPERATION_GERUND = { "start": "starting", "stop": "stopping", "reset": "resetting", "delete": "deleting", "initialize": "initializing", } _TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"} def _build_server_actions_context(db, server) -> dict: from l4d2web.services.timeago import humanize_delta latest_job = db.scalar( select(Job) .where(Job.server_id == server.id) .order_by(Job.created_at.desc()) .limit(1) ) if latest_job is not None: db.expunge(latest_job) actual_state = server.actual_state desired_state = server.desired_state active_operation = ( latest_job.operation if latest_job is not None and latest_job.state not in _TERMINAL_JOB_STATES else None ) has_active_job = active_operation is not None if has_active_job: display_state = _OPERATION_GERUND.get(active_operation, active_operation) + "…" state_class = "state-transient" elif actual_state == "running": display_state = "running" state_class = "state-running" elif actual_state == "stopped": display_state = "stopped" state_class = "state-stopped" else: display_state = actual_state or "unknown" state_class = "state-unknown" visible_buttons: list[str] = [] if not has_active_job: if actual_state == "running": visible_buttons.append("stop") else: visible_buttons.append("start") visible_buttons.append("reset") drift = (not has_active_job) and desired_state != actual_state latest_job_phrase: str | None = None latest_job_when: str | None = None latest_job_is_running = False if latest_job is not None: if latest_job.state in _TERMINAL_JOB_STATES: latest_job_phrase = f"{latest_job.operation} {latest_job.state}" ref_time = latest_job.finished_at or latest_job.created_at else: latest_job_phrase = _OPERATION_GERUND.get(latest_job.operation, latest_job.operation) latest_job_is_running = True ref_time = latest_job.started_at or latest_job.created_at latest_job_when = humanize_delta(ref_time) return { "display_state": display_state, "state_class": state_class, "visible_buttons": visible_buttons, "drift": drift, "latest_job": latest_job, "latest_job_phrase": latest_job_phrase, "latest_job_when": latest_job_when, "latest_job_is_running": latest_job_is_running, } @bp.get("/servers/") @require_login def server_detail(server_id: int): user = current_user() assert user is not None with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id)) if server is None: return Response(status=404) blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == server.blueprint_id)) ctx = _build_server_actions_context(db, server) connect_host = request.host.split(":")[0] return render_template( "server_detail.html", server=server, blueprint=blueprint, connect_host=connect_host, **ctx, ) @bp.get("/servers//actions") @require_login def server_actions_fragment(server_id: int): user = current_user() assert user is not None with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id)) if server is None: return Response(status=404) ctx = _build_server_actions_context(db, server) return render_template("_server_actions.html", server=server, **ctx) @bp.get("/servers//jobs") @require_login def server_jobs_page(server_id: int): user = current_user() assert user is not None with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id)) if server is None: return Response(status=404) rows = db.execute( select(Job, User, Server) .outerjoin(User, User.id == Job.user_id) .outerjoin(Server, Server.id == Job.server_id) .where(Job.server_id == server.id) .order_by(Job.created_at.desc()) ).all() return render_template("server_jobs.html", server=server, rows=rows) @bp.get("/overlays") @require_login def overlays() -> str: user = current_user() assert user is not None with session_scope() as db: query = select(Overlay).order_by(Overlay.name) if not user.admin: query = query.where( Overlay.user_id.is_(None) | (Overlay.user_id == user.id) ) overlays = db.scalars(query).all() return render_template("overlays.html", overlays=overlays) @bp.get("/overlays//jobs") @require_login def overlay_jobs_page(overlay_id: int): user = current_user() assert user is not None with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not user.admin and overlay.user_id is not None and overlay.user_id != user.id: return Response(status=403) rows = db.execute( select(Job, User, Server) .outerjoin(User, User.id == Job.user_id) .outerjoin(Server, Server.id == Job.server_id) .where(Job.operation == "build_overlay", Job.overlay_id == overlay.id) .order_by(Job.created_at.desc()) ).all() return render_template("overlay_jobs.html", overlay=overlay, rows=rows) _BUILD_STATE_LABELS = { "ok": ("ok", "state-running"), "failed": ("failed", "state-stopped"), "": ("never built", "state-unknown"), } def _build_overlay_build_status_context(db, overlay) -> dict: from l4d2web.services.timeago import humanize_delta latest_build = db.scalar( select(Job) .where(Job.operation == "build_overlay", Job.overlay_id == overlay.id) .order_by(Job.created_at.desc()) .limit(1) ) if latest_build is not None: db.expunge(latest_build) is_running = ( latest_build is not None and latest_build.state not in _TERMINAL_JOB_STATES ) if is_running: build_state_label = "building…" build_state_class = "state-transient" else: build_state_label, build_state_class = _BUILD_STATE_LABELS.get( overlay.last_build_status or "", _BUILD_STATE_LABELS[""] ) latest_build_phrase: str | None = None latest_build_when: str | None = None if latest_build is not None: if latest_build.state in _TERMINAL_JOB_STATES: latest_build_phrase = f"{latest_build.operation} {latest_build.state}" ref_time = latest_build.finished_at or latest_build.created_at else: latest_build_phrase = "building" ref_time = latest_build.started_at or latest_build.created_at latest_build_when = humanize_delta(ref_time) return { "latest_build": latest_build, "latest_build_is_running": is_running, "latest_build_phrase": latest_build_phrase, "latest_build_when": latest_build_when, "build_state_label": build_state_label, "build_state_class": build_state_class, } @bp.get("/overlays/") @require_login def overlay_detail(overlay_id: int): user = current_user() assert user is not None with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not user.admin and overlay.user_id is not None and overlay.user_id != user.id: return Response(status=403) using_blueprints_query = ( select(BlueprintModel) .join(BlueprintOverlay, BlueprintOverlay.blueprint_id == BlueprintModel.id) .where(BlueprintOverlay.overlay_id == overlay.id) .order_by(BlueprintModel.name) ) if not user.admin: using_blueprints_query = using_blueprints_query.where(BlueprintModel.user_id == user.id) using_blueprints = db.scalars(using_blueprints_query).all() workshop_items = [] if overlay.type == "workshop": workshop_items = db.scalars( select(WorkshopItem) .join( OverlayWorkshopItem, OverlayWorkshopItem.workshop_item_id == WorkshopItem.id, ) .where(OverlayWorkshopItem.overlay_id == overlay.id) .order_by(WorkshopItem.created_at) ).all() build_ctx = _build_overlay_build_status_context(db, overlay) file_tree_root_entries, file_tree_truncated_count = _root_file_tree(overlay) return render_template( "overlay_detail.html", overlay=overlay, using_blueprints=using_blueprints, workshop_items=workshop_items, file_tree_root_entries=file_tree_root_entries, file_tree_truncated=file_tree_truncated_count > 0 if file_tree_root_entries is not None else False, file_tree_truncated_count=file_tree_truncated_count, **build_ctx, ) @bp.get("/overlays//build-status") @require_login def overlay_build_status_fragment(overlay_id: int): user = current_user() assert user is not None with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not user.admin and overlay.user_id is not None and overlay.user_id != user.id: return Response(status=403) ctx = _build_overlay_build_status_context(db, overlay) return render_template("_overlay_build_status.html", overlay=overlay, **ctx) def _root_file_tree(overlay: Overlay) -> tuple[list[dict] | None, int]: """Return (entries, truncated_count) for the overlay's runtime directory, or (None, 0) if the directory doesn't exist or the path is unresolvable (e.g. legacy absolute `overlay.path` values that pre-date the current `path == str(id)` convention).""" try: overlay_root = safe_resolve_for_listing(overlay.path, "") except ValueError: return None, 0 if not overlay_root.is_dir(): return None, 0 entries, truncated_count = list_directory(overlay_root, overlay_root) return entries, truncated_count @bp.get("/blueprints") @require_login def blueprints_page() -> str: user = current_user() assert user is not None with session_scope() as db: blueprints = db.scalars( select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name) ).all() return render_template("blueprints.html", blueprints=blueprints) @bp.get("/blueprints/") @require_login def blueprint_page(blueprint_id: int): user = current_user() assert user is not None with session_scope() as db: blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == blueprint_id)) if blueprint is None: return Response(status=404) if blueprint.user_id != user.id: return Response(status=403) selected_overlays = db.scalars( select(Overlay) .join(BlueprintOverlay, BlueprintOverlay.overlay_id == Overlay.id) .where(BlueprintOverlay.blueprint_id == blueprint.id) .order_by(BlueprintOverlay.position) ).all() position_rows = db.execute( select(BlueprintOverlay.overlay_id, BlueprintOverlay.position) .where(BlueprintOverlay.blueprint_id == blueprint.id) ).all() expose_rows = db.execute( select(BlueprintOverlay.overlay_id, BlueprintOverlay.expose_server_cfg) .where(BlueprintOverlay.blueprint_id == blueprint.id) ).all() all_overlays = db.scalars( select(Overlay) .where(Overlay.user_id.is_(None) | (Overlay.user_id == user.id)) .order_by(Overlay.name) ).all() overlay_positions = {overlay_id: position + 1 for overlay_id, position in position_rows} overlay_expose_state = {overlay_id: bool(expose) for overlay_id, expose in expose_rows} selected_ids = {overlay.id for overlay in selected_overlays} available_overlays = [overlay for overlay in all_overlays if overlay.id not in selected_ids] return render_template( "blueprint_detail.html", blueprint=blueprint, selected_overlays=selected_overlays, available_overlays=available_overlays, all_overlays=all_overlays, selected_overlay_ids=selected_ids, overlay_positions=overlay_positions, overlay_expose_state=overlay_expose_state, arguments=json.loads(blueprint.arguments), config_lines=json.loads(blueprint.config), )