- POST /servers/<id>/console runs a command via rcon.execute_command and persists every outcome (success / empty / error) to command_history. - GET /servers/<id>/console/history returns paginated newest-first JSON for client-side up-arrow recall. - server_detail() now passes the last 50 history rows as console_history for server-side replay on page load. - 404 on ownership mismatch — no admin override. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
613 lines
21 KiB
Python
613 lines
21 KiB
Python
import json
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from flask import Blueprint, Response, current_app, redirect, render_template, request
|
|
from sqlalchemy import func, select, update
|
|
|
|
from l4d2web.auth import current_user, require_admin, require_login
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import Blueprint as BlueprintModel
|
|
from l4d2web.models import (
|
|
BlueprintOverlay,
|
|
CommandHistory,
|
|
Job,
|
|
Overlay,
|
|
OverlayWorkshopItem,
|
|
Server,
|
|
ServerLiveState,
|
|
User,
|
|
WorkshopItem,
|
|
)
|
|
from l4d2web.services.overlay_files import (
|
|
list_directory,
|
|
safe_resolve_for_listing,
|
|
safe_resolve_for_server_listing,
|
|
)
|
|
|
|
|
|
bp = Blueprint("pages", __name__)
|
|
|
|
|
|
@bp.get("/dashboard")
|
|
@require_login
|
|
def dashboard() -> str:
|
|
return render_template("dashboard.html")
|
|
|
|
|
|
@bp.get("/admin")
|
|
@require_admin
|
|
def admin_home() -> str:
|
|
return render_template("admin.html")
|
|
|
|
|
|
@bp.post("/admin/install")
|
|
@require_admin
|
|
def enqueue_runtime_install() -> Response:
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
db.add(Job(user_id=user.id, server_id=None, operation="install", state="queued"))
|
|
return redirect("/admin/jobs")
|
|
|
|
|
|
@bp.get("/admin/users")
|
|
@require_admin
|
|
def admin_users() -> str:
|
|
with session_scope() as db:
|
|
users = db.scalars(select(User).order_by(User.username)).all()
|
|
return render_template("admin_users.html", users=users)
|
|
|
|
|
|
@bp.post("/admin/users/<int:user_id>/deactivate")
|
|
@require_admin
|
|
def admin_users_deactivate(user_id: int) -> Response:
|
|
actor = current_user()
|
|
assert actor is not None
|
|
if actor.id == user_id:
|
|
return Response("cannot deactivate yourself", status=409)
|
|
with session_scope() as db:
|
|
target = db.scalar(select(User).where(User.id == user_id))
|
|
if target is None:
|
|
return Response(status=404)
|
|
target.active = False
|
|
return redirect("/admin/users")
|
|
|
|
|
|
@bp.post("/admin/users/<int:user_id>/activate")
|
|
@require_admin
|
|
def admin_users_activate(user_id: int) -> Response:
|
|
with session_scope() as db:
|
|
target = db.scalar(select(User).where(User.id == user_id))
|
|
if target is None:
|
|
return Response(status=404)
|
|
target.active = True
|
|
return redirect("/admin/users")
|
|
|
|
|
|
@bp.post("/admin/users/<int:user_id>/delete")
|
|
@require_admin
|
|
def admin_users_delete(user_id: int) -> Response:
|
|
actor = current_user()
|
|
assert actor is not None
|
|
if actor.id == user_id:
|
|
return Response("cannot delete yourself", status=409)
|
|
|
|
with session_scope() as db:
|
|
target = db.scalar(select(User).where(User.id == user_id))
|
|
if target is None:
|
|
return Response(status=404)
|
|
|
|
if target.admin:
|
|
other_admins = db.scalar(
|
|
select(func.count(User.id)).where(User.admin.is_(True), User.id != user_id)
|
|
) or 0
|
|
if other_admins == 0:
|
|
return Response("cannot delete the last admin", status=409)
|
|
|
|
owned_servers = db.scalar(
|
|
select(func.count(Server.id)).where(Server.user_id == user_id)
|
|
) or 0
|
|
owned_blueprints = db.scalar(
|
|
select(func.count(BlueprintModel.id)).where(BlueprintModel.user_id == user_id)
|
|
) or 0
|
|
owned_overlays = db.scalar(
|
|
select(func.count(Overlay.id)).where(Overlay.user_id == user_id)
|
|
) or 0
|
|
if owned_servers or owned_blueprints or owned_overlays:
|
|
return Response(
|
|
f"user owns content: {owned_servers} server(s), "
|
|
f"{owned_blueprints} blueprint(s), {owned_overlays} overlay(s) — "
|
|
"delete those first",
|
|
status=409,
|
|
)
|
|
|
|
# Job rows have nullable user_id — keep them as audit trail with the FK nulled out.
|
|
db.execute(update(Job).where(Job.user_id == user_id).values(user_id=None))
|
|
db.delete(target)
|
|
|
|
return redirect("/admin/users")
|
|
|
|
|
|
@bp.get("/admin/jobs")
|
|
@require_admin
|
|
def admin_jobs() -> str:
|
|
with session_scope() as db:
|
|
rows = db.execute(
|
|
select(Job, User, Server)
|
|
.outerjoin(User, User.id == Job.user_id)
|
|
.outerjoin(Server, Server.id == Job.server_id)
|
|
.order_by(Job.created_at.desc())
|
|
).all()
|
|
return render_template("admin_jobs.html", rows=rows)
|
|
|
|
|
|
@bp.get("/servers")
|
|
@require_login
|
|
def servers_page() -> str:
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
rows = db.execute(
|
|
select(Server, BlueprintModel)
|
|
.join(BlueprintModel, BlueprintModel.id == Server.blueprint_id)
|
|
.where(Server.user_id == user.id)
|
|
.order_by(Server.name)
|
|
).all()
|
|
blueprints = db.scalars(
|
|
select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name)
|
|
).all()
|
|
|
|
stale_seconds = current_app.config.get("LIVE_STATE_STALE_SECONDS", 30)
|
|
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(seconds=stale_seconds)
|
|
|
|
server_ids = [s.id for s, _bp in rows]
|
|
latest_rows: dict[int, ServerLiveState] = {}
|
|
if server_ids:
|
|
subq = (
|
|
select(
|
|
ServerLiveState.server_id,
|
|
func.max(ServerLiveState.started_at).label("mx"),
|
|
)
|
|
.where(ServerLiveState.server_id.in_(server_ids))
|
|
.group_by(ServerLiveState.server_id)
|
|
.subquery()
|
|
)
|
|
sls_rows = db.scalars(
|
|
select(ServerLiveState).join(
|
|
subq,
|
|
(ServerLiveState.server_id == subq.c.server_id)
|
|
& (ServerLiveState.started_at == subq.c.mx),
|
|
)
|
|
).all()
|
|
for r in sls_rows:
|
|
latest_rows[r.server_id] = r
|
|
|
|
live_state_by_server: dict[int, dict] = {}
|
|
for sid, row in latest_rows.items():
|
|
fresh = row.last_seen_at >= cutoff
|
|
live_state_by_server[sid] = {
|
|
"fresh": fresh,
|
|
"players": row.players,
|
|
"max_players": row.max_players,
|
|
"map": row.map,
|
|
"hibernating": row.hibernating,
|
|
}
|
|
|
|
prefill_blueprint_id: int | None = None
|
|
raw_prefill = request.args.get("blueprint_id")
|
|
if raw_prefill:
|
|
try:
|
|
candidate = int(raw_prefill)
|
|
except ValueError:
|
|
candidate = None
|
|
if candidate is not None and any(b.id == candidate for b in blueprints):
|
|
prefill_blueprint_id = candidate
|
|
|
|
return render_template(
|
|
"servers.html",
|
|
rows=rows,
|
|
blueprints=blueprints,
|
|
prefill_blueprint_id=prefill_blueprint_id,
|
|
live_state_by_server=live_state_by_server,
|
|
)
|
|
|
|
|
|
_OPERATION_GERUND = {
|
|
"start": "starting",
|
|
"stop": "stopping",
|
|
"reset": "resetting",
|
|
"delete": "deleting",
|
|
"initialize": "initializing",
|
|
}
|
|
|
|
_TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
|
|
|
|
|
|
def _build_server_actions_context(db, server) -> dict:
|
|
from l4d2web.services.timeago import humanize_delta
|
|
|
|
latest_job = db.scalar(
|
|
select(Job)
|
|
.where(Job.server_id == server.id)
|
|
.order_by(Job.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
if latest_job is not None:
|
|
db.expunge(latest_job)
|
|
actual_state = server.actual_state
|
|
desired_state = server.desired_state
|
|
|
|
active_operation = (
|
|
latest_job.operation
|
|
if latest_job is not None and latest_job.state not in _TERMINAL_JOB_STATES
|
|
else None
|
|
)
|
|
has_active_job = active_operation is not None
|
|
|
|
if has_active_job:
|
|
display_state = _OPERATION_GERUND.get(active_operation, active_operation) + "…"
|
|
state_class = "state-transient"
|
|
elif actual_state == "running":
|
|
display_state = "running"
|
|
state_class = "state-running"
|
|
elif actual_state == "stopped":
|
|
display_state = "stopped"
|
|
state_class = "state-stopped"
|
|
else:
|
|
display_state = actual_state or "unknown"
|
|
state_class = "state-unknown"
|
|
|
|
visible_buttons: list[str] = []
|
|
if not has_active_job:
|
|
if actual_state == "running":
|
|
visible_buttons.append("stop")
|
|
else:
|
|
visible_buttons.append("start")
|
|
visible_buttons.append("reset")
|
|
|
|
drift = (not has_active_job) and desired_state != actual_state
|
|
|
|
latest_job_phrase: str | None = None
|
|
latest_job_when: str | None = None
|
|
latest_job_is_running = False
|
|
if latest_job is not None:
|
|
if latest_job.state in _TERMINAL_JOB_STATES:
|
|
latest_job_phrase = f"{latest_job.operation} {latest_job.state}"
|
|
ref_time = latest_job.finished_at or latest_job.created_at
|
|
else:
|
|
latest_job_phrase = _OPERATION_GERUND.get(latest_job.operation, latest_job.operation)
|
|
latest_job_is_running = True
|
|
ref_time = latest_job.started_at or latest_job.created_at
|
|
latest_job_when = humanize_delta(ref_time)
|
|
|
|
return {
|
|
"display_state": display_state,
|
|
"state_class": state_class,
|
|
"visible_buttons": visible_buttons,
|
|
"drift": drift,
|
|
"latest_job": latest_job,
|
|
"latest_job_phrase": latest_job_phrase,
|
|
"latest_job_when": latest_job_when,
|
|
"latest_job_is_running": latest_job_is_running,
|
|
}
|
|
|
|
|
|
@bp.get("/servers/<int:server_id>")
|
|
@require_login
|
|
def server_detail(server_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
|
|
if server is None:
|
|
return Response(status=404)
|
|
blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == server.blueprint_id))
|
|
ctx = _build_server_actions_context(db, server)
|
|
console_history = list(
|
|
reversed(
|
|
db.scalars(
|
|
select(CommandHistory)
|
|
.where(
|
|
CommandHistory.user_id == user.id,
|
|
CommandHistory.server_id == server.id,
|
|
)
|
|
.order_by(CommandHistory.id.desc())
|
|
.limit(50)
|
|
).all()
|
|
)
|
|
)
|
|
|
|
connect_host = request.host.split(":")[0]
|
|
file_tree_root_entries, file_tree_truncated_count = _root_server_file_tree(server_id)
|
|
|
|
return render_template(
|
|
"server_detail.html",
|
|
server=server,
|
|
blueprint=blueprint,
|
|
connect_host=connect_host,
|
|
file_tree_root_entries=file_tree_root_entries,
|
|
file_tree_truncated=file_tree_truncated_count > 0
|
|
if file_tree_root_entries is not None
|
|
else False,
|
|
file_tree_truncated_count=file_tree_truncated_count,
|
|
console_history=console_history,
|
|
**ctx,
|
|
)
|
|
|
|
|
|
def _root_server_file_tree(server_id: int) -> tuple[list[dict] | None, int]:
|
|
"""Root listing of `runtime/<server_id>/merged`. Returns (None, 0) when
|
|
the merged dir doesn't exist yet (server never started or just reset)."""
|
|
try:
|
|
merged_root = safe_resolve_for_server_listing(server_id, "")
|
|
except ValueError:
|
|
return None, 0
|
|
if merged_root is None:
|
|
return None, 0
|
|
entries, truncated_count = list_directory(merged_root, merged_root)
|
|
return entries, truncated_count
|
|
|
|
|
|
@bp.get("/servers/<int:server_id>/actions")
|
|
@require_login
|
|
def server_actions_fragment(server_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
|
|
if server is None:
|
|
return Response(status=404)
|
|
ctx = _build_server_actions_context(db, server)
|
|
return render_template("_server_actions.html", server=server, **ctx)
|
|
|
|
|
|
@bp.get("/servers/<int:server_id>/jobs")
|
|
@require_login
|
|
def server_jobs_page(server_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
|
|
if server is None:
|
|
return Response(status=404)
|
|
rows = db.execute(
|
|
select(Job, User, Server)
|
|
.outerjoin(User, User.id == Job.user_id)
|
|
.outerjoin(Server, Server.id == Job.server_id)
|
|
.where(Job.server_id == server.id)
|
|
.order_by(Job.created_at.desc())
|
|
).all()
|
|
|
|
return render_template("server_jobs.html", server=server, rows=rows)
|
|
|
|
|
|
@bp.get("/overlays")
|
|
@require_login
|
|
def overlays() -> str:
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
query = select(Overlay).order_by(Overlay.name)
|
|
if not user.admin:
|
|
query = query.where(
|
|
Overlay.user_id.is_(None) | (Overlay.user_id == user.id)
|
|
)
|
|
overlays = db.scalars(query).all()
|
|
return render_template("overlays.html", overlays=overlays)
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/jobs")
|
|
@require_login
|
|
def overlay_jobs_page(overlay_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
|
|
if overlay is None:
|
|
return Response(status=404)
|
|
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
|
|
return Response(status=403)
|
|
rows = db.execute(
|
|
select(Job, User, Server)
|
|
.outerjoin(User, User.id == Job.user_id)
|
|
.outerjoin(Server, Server.id == Job.server_id)
|
|
.where(Job.operation == "build_overlay", Job.overlay_id == overlay.id)
|
|
.order_by(Job.created_at.desc())
|
|
).all()
|
|
return render_template("overlay_jobs.html", overlay=overlay, rows=rows)
|
|
|
|
|
|
_BUILD_STATE_LABELS = {
|
|
"ok": ("ok", "state-running"),
|
|
"failed": ("failed", "state-stopped"),
|
|
"": ("never built", "state-unknown"),
|
|
}
|
|
|
|
|
|
def _build_overlay_build_status_context(db, overlay) -> dict:
|
|
from l4d2web.services.timeago import humanize_delta
|
|
|
|
latest_build = db.scalar(
|
|
select(Job)
|
|
.where(Job.operation == "build_overlay", Job.overlay_id == overlay.id)
|
|
.order_by(Job.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
if latest_build is not None:
|
|
db.expunge(latest_build)
|
|
|
|
is_running = (
|
|
latest_build is not None and latest_build.state not in _TERMINAL_JOB_STATES
|
|
)
|
|
|
|
if is_running:
|
|
build_state_label = "building…"
|
|
build_state_class = "state-transient"
|
|
else:
|
|
build_state_label, build_state_class = _BUILD_STATE_LABELS.get(
|
|
overlay.last_build_status or "", _BUILD_STATE_LABELS[""]
|
|
)
|
|
|
|
latest_build_phrase: str | None = None
|
|
latest_build_when: str | None = None
|
|
if latest_build is not None:
|
|
if latest_build.state in _TERMINAL_JOB_STATES:
|
|
latest_build_phrase = f"{latest_build.operation} {latest_build.state}"
|
|
ref_time = latest_build.finished_at or latest_build.created_at
|
|
else:
|
|
latest_build_phrase = "building"
|
|
ref_time = latest_build.started_at or latest_build.created_at
|
|
latest_build_when = humanize_delta(ref_time)
|
|
|
|
return {
|
|
"latest_build": latest_build,
|
|
"latest_build_is_running": is_running,
|
|
"latest_build_phrase": latest_build_phrase,
|
|
"latest_build_when": latest_build_when,
|
|
"build_state_label": build_state_label,
|
|
"build_state_class": build_state_class,
|
|
}
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>")
|
|
@require_login
|
|
def overlay_detail(overlay_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
|
|
if overlay is None:
|
|
return Response(status=404)
|
|
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
|
|
return Response(status=403)
|
|
using_blueprints_query = (
|
|
select(BlueprintModel)
|
|
.join(BlueprintOverlay, BlueprintOverlay.blueprint_id == BlueprintModel.id)
|
|
.where(BlueprintOverlay.overlay_id == overlay.id)
|
|
.order_by(BlueprintModel.name)
|
|
)
|
|
if not user.admin:
|
|
using_blueprints_query = using_blueprints_query.where(BlueprintModel.user_id == user.id)
|
|
using_blueprints = db.scalars(using_blueprints_query).all()
|
|
workshop_items = []
|
|
if overlay.type == "workshop":
|
|
workshop_items = db.scalars(
|
|
select(WorkshopItem)
|
|
.join(
|
|
OverlayWorkshopItem,
|
|
OverlayWorkshopItem.workshop_item_id == WorkshopItem.id,
|
|
)
|
|
.where(OverlayWorkshopItem.overlay_id == overlay.id)
|
|
.order_by(WorkshopItem.created_at)
|
|
).all()
|
|
build_ctx = _build_overlay_build_status_context(db, overlay)
|
|
|
|
file_tree_root_entries, file_tree_truncated_count = _root_file_tree(overlay)
|
|
|
|
return render_template(
|
|
"overlay_detail.html",
|
|
overlay=overlay,
|
|
using_blueprints=using_blueprints,
|
|
workshop_items=workshop_items,
|
|
file_tree_root_entries=file_tree_root_entries,
|
|
file_tree_truncated=file_tree_truncated_count > 0
|
|
if file_tree_root_entries is not None
|
|
else False,
|
|
file_tree_truncated_count=file_tree_truncated_count,
|
|
**build_ctx,
|
|
)
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/build-status")
|
|
@require_login
|
|
def overlay_build_status_fragment(overlay_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
|
|
if overlay is None:
|
|
return Response(status=404)
|
|
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
|
|
return Response(status=403)
|
|
ctx = _build_overlay_build_status_context(db, overlay)
|
|
return render_template("_overlay_build_status.html", overlay=overlay, **ctx)
|
|
|
|
|
|
def _root_file_tree(overlay: Overlay) -> tuple[list[dict] | None, int]:
|
|
"""Return (entries, truncated_count) for the overlay's runtime directory,
|
|
or (None, 0) if the directory doesn't exist or the path is unresolvable
|
|
(e.g. legacy absolute `overlay.path` values that pre-date the current
|
|
`path == str(id)` convention)."""
|
|
try:
|
|
overlay_root = safe_resolve_for_listing(overlay.path, "")
|
|
except ValueError:
|
|
return None, 0
|
|
if not overlay_root.is_dir():
|
|
return None, 0
|
|
entries, truncated_count = list_directory(overlay_root, overlay_root)
|
|
return entries, truncated_count
|
|
|
|
|
|
@bp.get("/blueprints")
|
|
@require_login
|
|
def blueprints_page() -> str:
|
|
user = current_user()
|
|
assert user is not None
|
|
with session_scope() as db:
|
|
blueprints = db.scalars(
|
|
select(BlueprintModel).where(BlueprintModel.user_id == user.id).order_by(BlueprintModel.name)
|
|
).all()
|
|
return render_template("blueprints.html", blueprints=blueprints)
|
|
|
|
|
|
@bp.get("/blueprints/<int:blueprint_id>")
|
|
@require_login
|
|
def blueprint_page(blueprint_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
with session_scope() as db:
|
|
blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == blueprint_id))
|
|
if blueprint is None:
|
|
return Response(status=404)
|
|
if blueprint.user_id != user.id:
|
|
return Response(status=403)
|
|
|
|
selected_overlays = db.scalars(
|
|
select(Overlay)
|
|
.join(BlueprintOverlay, BlueprintOverlay.overlay_id == Overlay.id)
|
|
.where(BlueprintOverlay.blueprint_id == blueprint.id)
|
|
.order_by(BlueprintOverlay.position)
|
|
).all()
|
|
position_rows = db.execute(
|
|
select(BlueprintOverlay.overlay_id, BlueprintOverlay.position)
|
|
.where(BlueprintOverlay.blueprint_id == blueprint.id)
|
|
).all()
|
|
expose_rows = db.execute(
|
|
select(BlueprintOverlay.overlay_id, BlueprintOverlay.expose_server_cfg)
|
|
.where(BlueprintOverlay.blueprint_id == blueprint.id)
|
|
).all()
|
|
all_overlays = db.scalars(
|
|
select(Overlay)
|
|
.where(Overlay.user_id.is_(None) | (Overlay.user_id == user.id))
|
|
.order_by(Overlay.name)
|
|
).all()
|
|
|
|
overlay_positions = {overlay_id: position + 1 for overlay_id, position in position_rows}
|
|
overlay_expose_state = {overlay_id: bool(expose) for overlay_id, expose in expose_rows}
|
|
selected_ids = {overlay.id for overlay in selected_overlays}
|
|
available_overlays = [overlay for overlay in all_overlays if overlay.id not in selected_ids]
|
|
return render_template(
|
|
"blueprint_detail.html",
|
|
blueprint=blueprint,
|
|
selected_overlays=selected_overlays,
|
|
available_overlays=available_overlays,
|
|
all_overlays=all_overlays,
|
|
selected_overlay_ids=selected_ids,
|
|
overlay_positions=overlay_positions,
|
|
overlay_expose_state=overlay_expose_state,
|
|
arguments=json.loads(blueprint.arguments),
|
|
config_lines=json.loads(blueprint.config),
|
|
)
|