feat(l4d2-web): managed global map overlays with daily refresh

Adds two managed system overlays (l4d2center-maps, cedapug-maps) that
fetch curated map archives from upstream sources and reconcile addons
symlinks for non-Steam maps. A daily systemd timer enqueues a coalesced
refresh_global_overlays worker job; downloads, extraction, and rebuilds
run in the existing job worker and surface in the job log UI.

Schema: GlobalOverlaySource / GlobalOverlayItem / GlobalOverlayItemFile
plus nullable Job.user_id so system jobs render as "system" in the UI.
The new builder reconciles symlinks against the per-source vpk cache
and leaves foreign symlinks untouched. Initialize-time guard refuses
to mount a partial overlay if any expected vpk is missing from cache.

Refresh service uses shutil.move to handle EXDEV when /tmp and the
cache live on different filesystems.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
mwiegand 2026-05-08 08:05:14 +02:00
parent 4f78574edd
commit 92d6ebbe82
No known key found for this signature in database
39 changed files with 4770 additions and 28 deletions

View file

@ -14,6 +14,7 @@ The deployment uses these paths:
- `/var/lib/left4me/installation`: shared L4D2 installation.
- `/var/lib/left4me/overlays`: overlay directories. External (admin-managed) overlays still live at any relative path under here; new overlays created through the web UI use `${overlay_id}` as their path.
- `/var/lib/left4me/workshop_cache`: deduplicated cache of `.vpk` files downloaded for workshop overlays. One file per Steam item, named `{steam_id}.vpk`. Workshop overlays symlink into this tree.
- `/var/lib/left4me/global_overlay_cache`: cache of non-Steam map archives and extracted `.vpk` files used by managed global map overlays.
- `/var/lib/left4me/instances`: rendered instance specifications and per-instance state.
- `/var/lib/left4me/runtime`: per-instance runtime mount directories.
- `/var/lib/left4me/tmp`: temporary files used by deployment/runtime operations.
@ -41,6 +42,10 @@ deploy/deploy-test-server.sh deploy-user@example-host
The SSH user must be able to run `sudo` on the target host. The deployment configures system packages, directories, environment files, helper scripts, sudoers rules, Python dependencies, and systemd units.
## Scheduled Jobs
`left4me-refresh-global-overlays.timer` runs daily with `Persistent=true`. It invokes `flask refresh-global-overlays`, which only enqueues a `refresh_global_overlays` job; downloads and rebuilds run in the web worker and are visible in the normal job log UI.
## Admin Bootstrap
Set the bootstrap credentials in the environment when creating the first admin user:

View file

@ -97,6 +97,7 @@ $sudo_cmd mkdir -p \
/var/lib/left4me/instances \
/var/lib/left4me/runtime \
/var/lib/left4me/workshop_cache \
/var/lib/left4me/global_overlay_cache \
/var/lib/left4me/tmp
$sudo_cmd chown left4me:left4me \
@ -106,6 +107,7 @@ $sudo_cmd chown left4me:left4me \
/var/lib/left4me/instances \
/var/lib/left4me/runtime \
/var/lib/left4me/workshop_cache \
/var/lib/left4me/global_overlay_cache \
/var/lib/left4me/tmp
$sudo_cmd chown -R left4me:left4me /opt/left4me
@ -124,6 +126,8 @@ $sudo_cmd chown -R left4me:left4me /opt/left4me
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-web.service /usr/local/lib/systemd/system/left4me-web.service
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-server@.service /usr/local/lib/systemd/system/left4me-server@.service
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.service /usr/local/lib/systemd/system/left4me-refresh-global-overlays.service
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer /usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/libexec/left4me/left4me-systemctl /usr/local/libexec/left4me/left4me-systemctl
$sudo_cmd cp /opt/left4me/deploy/files/usr/local/libexec/left4me/left4me-journalctl /usr/local/libexec/left4me/left4me-journalctl
$sudo_cmd chmod 0755 /usr/local/libexec/left4me/left4me-systemctl /usr/local/libexec/left4me/left4me-journalctl
@ -176,6 +180,7 @@ fi
$sudo_cmd systemctl daemon-reload
$sudo_cmd systemctl enable --now left4me-web.service
$sudo_cmd systemctl restart left4me-web.service
$sudo_cmd systemctl enable --now left4me-refresh-global-overlays.timer
for attempt in 1 2 3 4 5 6 7 8 9 10; do
if curl -fsS http://127.0.0.1:8000/health; then
exit 0

View file

@ -0,0 +1,17 @@
[Unit]
Description=left4me refresh global map overlays
After=network-online.target left4me-web.service
Wants=network-online.target
[Service]
Type=oneshot
User=left4me
Group=left4me
WorkingDirectory=/opt/left4me
Environment=HOME=/var/lib/left4me
Environment=PATH=/opt/left4me/.venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
EnvironmentFile=/etc/left4me/host.env
EnvironmentFile=/etc/left4me/web.env
ExecStart=/opt/left4me/.venv/bin/flask --app l4d2web.app:create_app refresh-global-overlays
ProtectSystem=full
ReadWritePaths=/var/lib/left4me

View file

@ -0,0 +1,10 @@
[Unit]
Description=Daily left4me global map overlay refresh
[Timer]
OnCalendar=daily
Persistent=true
Unit=left4me-refresh-global-overlays.service
[Install]
WantedBy=timers.target

View file

@ -9,6 +9,8 @@ DEPLOY = ROOT / "deploy"
WEB_UNIT = DEPLOY / "files/usr/local/lib/systemd/system/left4me-web.service"
SERVER_UNIT = DEPLOY / "files/usr/local/lib/systemd/system/left4me-server@.service"
GLOBAL_REFRESH_SERVICE = DEPLOY / "files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.service"
GLOBAL_REFRESH_TIMER = DEPLOY / "files/usr/local/lib/systemd/system/left4me-refresh-global-overlays.timer"
SYSTEMCTL_HELPER = DEPLOY / "files/usr/local/libexec/left4me/left4me-systemctl"
JOURNALCTL_HELPER = DEPLOY / "files/usr/local/libexec/left4me/left4me-journalctl"
SUDOERS = DEPLOY / "files/etc/sudoers.d/left4me"
@ -203,3 +205,25 @@ def test_deploy_script_runs_migrations_before_app_initialization() -> None:
def test_deploy_script_shell_syntax() -> None:
subprocess.run(["sh", "-n", str(DEPLOY_SCRIPT)], check=True)
def test_global_refresh_timer_units_exist_and_enqueue_only():
service = GLOBAL_REFRESH_SERVICE.read_text()
timer = GLOBAL_REFRESH_TIMER.read_text()
assert "User=left4me" in service
assert "EnvironmentFile=/etc/left4me/host.env" in service
assert "EnvironmentFile=/etc/left4me/web.env" in service
assert "flask --app l4d2web.app:create_app refresh-global-overlays" in service
assert "OnCalendar=daily" in timer
assert "Persistent=true" in timer
assert "WantedBy=timers.target" in timer
def test_deploy_script_installs_and_enables_global_refresh_timer():
script = DEPLOY_SCRIPT.read_text()
assert "/var/lib/left4me/global_overlay_cache" in script
assert "left4me-refresh-global-overlays.service" in script
assert "left4me-refresh-global-overlays.timer" in script
assert "systemctl enable --now left4me-refresh-global-overlays.timer" in script

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,353 @@
# L4D2 Global Map Overlays Design
**Goal:** Add two managed, system-wide map overlays, `l4d2center-maps` and `cedapug-maps`, populated from upstream map sources and refreshed daily through the existing job system.
**Approval status:** User-approved design direction. Implementation must not start until this spec is reviewed and an implementation plan is written.
## Context
`left4me` already has typed overlays, a builder registry, global overlays through `Overlay.user_id = NULL`, and queued overlay build jobs. Steam Workshop overlays use a cache plus symlinks into `left4dead2/addons/`, and server initialization already runs overlay builders before calling `l4d2ctl initialize`.
Global map sources fit the same model. The host library remains unchanged: it receives overlay refs and mounts directories. The web app owns map-source fetching, cache management, reconciliation, and job logs.
The two upstream sources are:
- `https://l4d2center.com/maps/servers/index.csv`
- `https://cedapug.com/custom`
## Locked Decisions
1. **One general operation.** Use `refresh_global_overlays`, not source-specific cron operations.
2. **Systemd owns time.** A systemd timer runs daily and invokes a Flask CLI command. The CLI only enqueues work; the existing worker performs downloads and writes logs.
3. **System jobs are nullable-owner jobs.** `jobs.user_id` becomes nullable. `NULL` means the job was created by the system. UI displays owner as `system`. Only admins can access system jobs.
4. **Managed global overlays are auto-seeded.** The app creates or repairs exactly one `l4d2center-maps` overlay and exactly one `cedapug-maps` overlay.
5. **Global overlays are normal system overlays for users.** `Overlay.user_id = NULL` makes them visible to every authenticated user and selectable in every user's blueprint editor.
6. **Managed types are not user-creatable.** Normal overlay creation does not offer `l4d2center_maps` or `cedapug_maps`. The seeder is the only code path that creates those types.
7. **Exact reconciliation.** Refresh makes each managed overlay match its upstream manifest. Removed upstream maps are removed from the managed overlay symlink set. Foreign files are left alone and logged.
8. **No initialize-time downloads.** `initialize_server()` may run builders to repair symlinks, but it must not fetch remote manifests or download large archives. Missing cache content fails clearly.
9. **Separate cache from Workshop.** Non-Steam global maps use `${LEFT4ME_ROOT}/global_overlay_cache`, not `${LEFT4ME_ROOT}/workshop_cache`.
10. **Source-specific parsing stays explicit.** Do not introduce a generic arbitrary HTTP source framework in this phase.
## Architecture
The design extends the existing overlay-builder registry:
```python
BUILDERS = {
"external": ExternalBuilder(),
"workshop": WorkshopBuilder(),
"l4d2center_maps": GlobalMapOverlayBuilder(),
"cedapug_maps": GlobalMapOverlayBuilder(),
}
```
Both global map overlay types share the same filesystem builder. Source-specific code lives in refresh services that know how to fetch and parse upstream manifests.
High-level flow:
```text
systemd timer
-> flask refresh-global-overlays
-> ensure_global_overlays()
-> enqueue refresh_global_overlays job (coalesced)
-> worker fetches manifests
-> worker downloads/extracts cache files
-> worker records desired VPK files
-> worker rebuilds overlay symlinks directly
```
Auto-seeded overlay rows use fixed names, managed types, `user_id = NULL`, and web-generated paths:
```text
name=l4d2center-maps, type=l4d2center_maps, user_id=NULL, path=str(id)
name=cedapug-maps, type=cedapug_maps, user_id=NULL, path=str(id)
```
## Data Model
### `jobs`
Change `jobs.user_id` from required to nullable.
`NULL` means a system-created job. Authorization rules become:
- Admins can view, stream, and cancel every job, including system jobs.
- Non-admins can access only jobs where `job.user_id == current_user.id`.
- System jobs are not visible to non-admins through direct job URLs.
Job list/detail pages use outer joins to `users` and render missing owners as `system`.
### `global_overlay_sources`
One row per managed global source overlay:
```text
id INTEGER PRIMARY KEY
overlay_id INTEGER NOT NULL UNIQUE REFERENCES overlays(id) ON DELETE CASCADE
source_key VARCHAR(64) NOT NULL UNIQUE -- l4d2center-maps | cedapug-maps
source_type VARCHAR(32) NOT NULL -- l4d2center_csv | cedapug_custom_page
source_url TEXT NOT NULL
last_manifest_hash VARCHAR(64) NOT NULL DEFAULT ''
last_refreshed_at DATETIME NULL
last_error TEXT NOT NULL DEFAULT ''
created_at DATETIME NOT NULL
updated_at DATETIME NOT NULL
```
`source_key` is stable and used by the seeder to repair missing rows.
### `global_overlay_items`
One row per manifest item belonging to a global overlay source:
```text
id INTEGER PRIMARY KEY
source_id INTEGER NOT NULL REFERENCES global_overlay_sources(id) ON DELETE CASCADE
item_key VARCHAR(255) NOT NULL -- stable per source
display_name VARCHAR(255) NOT NULL DEFAULT ''
download_url TEXT NOT NULL
expected_vpk_name VARCHAR(255) NOT NULL DEFAULT ''
expected_size BIGINT NULL
expected_md5 VARCHAR(32) NOT NULL DEFAULT ''
etag VARCHAR(255) NOT NULL DEFAULT ''
last_modified VARCHAR(255) NOT NULL DEFAULT ''
content_length BIGINT NULL
last_downloaded_at DATETIME NULL
last_error TEXT NOT NULL DEFAULT ''
created_at DATETIME NOT NULL
updated_at DATETIME NOT NULL
UNIQUE(source_id, item_key)
```
For `l4d2center`, `item_key` and `expected_vpk_name` come from the CSV `Name` column, and `expected_size` / `expected_md5` come from the CSV.
For `cedapug`, `item_key` is the direct download URL path basename, normalized without query parameters. CEDAPUG does not publish checksums in the observed page, so integrity uses HTTP metadata when available and archive extraction checks.
### `global_overlay_item_files`
One row per extracted VPK file that should appear in an overlay:
```text
id INTEGER PRIMARY KEY
item_id INTEGER NOT NULL REFERENCES global_overlay_items(id) ON DELETE CASCADE
vpk_name VARCHAR(255) NOT NULL
cache_path TEXT NOT NULL -- relative path under global_overlay_cache
size BIGINT NOT NULL
md5 VARCHAR(32) NOT NULL DEFAULT ''
created_at DATETIME NOT NULL
updated_at DATETIME NOT NULL
UNIQUE(item_id, vpk_name)
```
This extra file table handles archives that contain more than one `.vpk` without overloading the item row.
## Filesystem Layout
Use a cache separate from Steam Workshop:
```text
${LEFT4ME_ROOT}/
global_overlay_cache/
l4d2center-maps/
archives/
vpks/
cedapug-maps/
archives/
vpks/
overlays/
{overlay_id}/
left4dead2/addons/
*.vpk -> absolute symlink to global_overlay_cache/.../vpks/*.vpk
```
Cache file writes are atomic: download to `*.partial`, extract to a temporary directory, verify, then `os.replace()` final VPK files.
Symlink targets are absolute, matching the existing Workshop overlay design.
## Source Parsing
### L4D2Center
Fetch `https://l4d2center.com/maps/servers/index.csv` with a normal HTTP timeout.
The CSV is semicolon-delimited and contains:
```text
Name;Size;md5;Download link
```
Each item produces:
- `item_key = Name`
- `expected_vpk_name = Name`
- `expected_size = Size`
- `expected_md5 = md5`
- `download_url = Download link`
Downloads are `.7z` archives. Extraction uses a Python 7z implementation such as `py7zr` so tests do not depend on a system `7z` binary. After extraction, the expected VPK file must exist and match both size and md5. A mismatch fails that item and leaves the prior cached file in place.
### CEDAPUG
Fetch `https://cedapug.com/custom` and parse the embedded `renderCustomMapDownloads([...])` data.
Only direct download links are managed in v1:
- Relative links like `/maps/FatalFreight.zip` are converted to absolute `https://cedapug.com/maps/FatalFreight.zip`.
- External `http` links are logged and skipped in v1.
- Entries without a download link are built-in campaigns and skipped.
Downloads are `.zip` archives extracted with Python's standard `zipfile`. Every `.vpk` in the archive becomes a managed output file for that item. If no `.vpk` is present, the item fails and the prior cached files remain in place.
Because CEDAPUG does not publish checksums in the observed page, refresh detects changes using `ETag`, `Last-Modified`, `Content-Length`, and local extracted file metadata when available. A manual refresh can force revalidation by clearing item metadata in a later maintenance path; no force-refresh UI is included in this design.
## Refresh Job
`refresh_global_overlays` is a global worker operation.
Behavior:
1. Ensure both managed global overlays and source rows exist.
2. Fetch both manifests.
3. Upsert manifest items.
4. Mark items absent from the manifest as no longer desired by deleting their item rows; cascading deletes remove their file rows.
5. Download and extract new or changed items.
6. Keep prior cache files when an item download or verification fails, but record `last_error`.
7. Rebuild symlinks for changed sources directly through the same builder interface used by `build_overlay`.
8. Emit clear job logs: manifest counts, downloads, skips, removals, verification failures, and build summaries.
`refresh_global_overlays` does not enqueue child `build_overlay` jobs. Direct builder invocation keeps the overlay in sync before the refresh job releases its global mutex, so a server job cannot start against updated cache metadata but stale overlay symlinks.
Coalescing:
- If a `refresh_global_overlays` job is queued or running, CLI/admin requests return the existing job instead of inserting a duplicate.
## Builder Reconciliation
`GlobalMapOverlayBuilder` reads desired file rows for the overlay's source and reconciles only symlinks it manages.
Managed symlink rule:
- A symlink in `left4dead2/addons/` is managed if its resolved target is under `${LEFT4ME_ROOT}/global_overlay_cache/{source_key}/vpks/`.
- Managed symlinks absent from desired files are removed.
- Desired files missing from cache are skipped and logged as errors.
- Non-symlink files and symlinks outside the source cache are left untouched and logged as foreign entries.
This mirrors `WorkshopBuilder` behavior and keeps manual files safe.
## Scheduler Rules
`refresh_global_overlays` joins the existing global mutex group.
It must not run concurrently with:
- `install`
- `refresh_workshop_items`
- any `build_overlay`
- any server job (`initialize`, `start`, `stop`, `delete`)
No server or overlay job may start while `refresh_global_overlays` is running.
This conservative rule is acceptable because daily map refreshes are rare and large downloads should not race runtime changes.
## CLI And Systemd Timer
Add Flask CLI command:
```text
flask refresh-global-overlays
```
The command:
- Loads app config and DB.
- Ensures global overlays exist.
- Enqueues or returns the existing `refresh_global_overlays` job.
- Prints the job id.
- Does not run downloads itself.
Add deployment units:
```text
left4me-refresh-global-overlays.service
left4me-refresh-global-overlays.timer
```
Service command:
```text
/opt/left4me/.venv/bin/flask --app l4d2web.app:create_app refresh-global-overlays
```
Timer policy:
```text
OnCalendar=daily
Persistent=true
```
The service runs as the `left4me` user with `/etc/left4me/host.env` and `/etc/left4me/web.env`, matching `left4me-web.service`.
## Permissions And UI
Overlay list behavior:
- Admins see all overlays, including managed global map overlays.
- Non-admin users see system overlays and their own private workshop overlays.
- Managed global overlays appear in blueprint overlay selection for every user.
Creation behavior:
- Non-admin users can create only user-creatable types, currently `workshop`.
- Admins can create normal admin-creatable types, currently `external` and `workshop`.
- No user-facing create form offers `l4d2center_maps` or `cedapug_maps`.
- Auto-seeding is the only creation path for managed global map overlay types.
Admin controls:
- Add a manual "Refresh global overlays" action in the admin area.
- The action enqueues the same coalesced `refresh_global_overlays` job as the timer.
- Managed overlay detail pages show source type, source URL, last refresh time, last error, item count, and latest related jobs.
## Error Handling
- Manifest fetch failure fails the job if no source can be processed. If one source succeeds and one fails, the job should still finish failed with partial-success logs and preserve prior content for the failed source.
- Per-item download failures do not abort sibling items.
- Verification failures keep prior cached files and record `last_error` on the item.
- Extraction rejects path traversal entries and ignores non-VPK files.
- Unsupported CEDAPUG external links are skipped with a warning.
- Initialize-time checks fail if desired global map files are missing from cache, naming the overlay and missing VPK names.
## Tests
Test coverage should include:
- Auto-seeding creates exactly one source overlay per source and repairs missing source rows.
- `jobs.user_id` nullable behavior, outer joins, and `system` display.
- Non-admins cannot access system jobs directly.
- CLI coalesces queued/running `refresh_global_overlays` jobs.
- Scheduler truth table for the new global operation.
- L4D2Center CSV parser with semicolon-delimited fixture data.
- CEDAPUG embedded JavaScript parser with fixture HTML.
- L4D2Center download/extract verifies VPK size and md5.
- CEDAPUG download/extract records every VPK in a zip archive.
- Reconcile removes obsolete managed symlinks and leaves foreign files alone.
- Overlay create UI rejects managed singleton types.
- Blueprint overlay selection includes managed global overlays for all users.
- Deployment tests cover the service and timer artifacts.
## Out Of Scope
- User-created global map source overlays.
- Arbitrary configurable HTTP manifest sources.
- Force-refresh UI for CEDAPUG items.
- Cache garbage collection for unreferenced archive files.
- Client-side map download UX.
- Steam Workshop links discovered on the CEDAPUG page; those are skipped rather than imported into workshop overlays.
- Host-library awareness of managed overlay types.
## Implementation Boundaries
- `l4d2host` remains unchanged.
- The web app continues to call host operations only through `l4d2ctl`.
- Existing blueprint semantics remain unchanged: overlays are live-linked, ordered, and first overlay has highest precedence.
- Existing workshop overlay behavior remains unchanged except scheduler interactions with the new global operation.

View file

@ -0,0 +1,105 @@
"""global map overlays
Revision ID: 0003_global_map_overlays
Revises: 0002_workshop_overlays
Create Date: 2026-05-07
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "0003_global_map_overlays"
down_revision: Union[str, Sequence[str], None] = "0002_workshop_overlays"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("jobs") as batch_op:
batch_op.alter_column("user_id", existing_type=sa.Integer(), nullable=True)
op.create_table(
"global_overlay_sources",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"overlay_id",
sa.Integer(),
sa.ForeignKey("overlays.id", ondelete="CASCADE"),
nullable=False,
unique=True,
),
sa.Column("source_key", sa.String(length=64), nullable=False, unique=True),
sa.Column("source_type", sa.String(length=32), nullable=False),
sa.Column("source_url", sa.Text(), nullable=False),
sa.Column("last_manifest_hash", sa.String(length=64), nullable=False, server_default=""),
sa.Column("last_refreshed_at", sa.DateTime(), nullable=True),
sa.Column("last_error", sa.Text(), nullable=False, server_default=""),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
)
op.create_index("ix_global_overlay_sources_type", "global_overlay_sources", ["source_type"])
op.create_table(
"global_overlay_items",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"source_id",
sa.Integer(),
sa.ForeignKey("global_overlay_sources.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("item_key", sa.String(length=255), nullable=False),
sa.Column("display_name", sa.String(length=255), nullable=False, server_default=""),
sa.Column("download_url", sa.Text(), nullable=False),
sa.Column("expected_vpk_name", sa.String(length=255), nullable=False, server_default=""),
sa.Column("expected_size", sa.BigInteger(), nullable=True),
sa.Column("expected_md5", sa.String(length=32), nullable=False, server_default=""),
sa.Column("etag", sa.String(length=255), nullable=False, server_default=""),
sa.Column("last_modified", sa.String(length=255), nullable=False, server_default=""),
sa.Column("content_length", sa.BigInteger(), nullable=True),
sa.Column("last_downloaded_at", sa.DateTime(), nullable=True),
sa.Column("last_error", sa.Text(), nullable=False, server_default=""),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.UniqueConstraint("source_id", "item_key", name="uq_global_overlay_item_source_key"),
)
op.create_index("ix_global_overlay_items_source", "global_overlay_items", ["source_id"])
op.create_table(
"global_overlay_item_files",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"item_id",
sa.Integer(),
sa.ForeignKey("global_overlay_items.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("vpk_name", sa.String(length=255), nullable=False),
sa.Column("cache_path", sa.Text(), nullable=False),
sa.Column("size", sa.BigInteger(), nullable=False),
sa.Column("md5", sa.String(length=32), nullable=False, server_default=""),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.UniqueConstraint("item_id", "vpk_name", name="uq_global_overlay_item_file_name"),
)
op.create_index("ix_global_overlay_item_files_item", "global_overlay_item_files", ["item_id"])
def downgrade() -> None:
op.drop_index("ix_global_overlay_item_files_item", table_name="global_overlay_item_files")
op.drop_table("global_overlay_item_files")
op.drop_index("ix_global_overlay_items_source", table_name="global_overlay_items")
op.drop_table("global_overlay_items")
op.drop_index("ix_global_overlay_sources_type", table_name="global_overlay_sources")
op.drop_table("global_overlay_sources")
op.execute(
"DELETE FROM job_logs WHERE job_id IN "
"(SELECT id FROM jobs WHERE user_id IS NULL)"
)
op.execute("DELETE FROM jobs WHERE user_id IS NULL")
with op.batch_alter_table("jobs") as batch_op:
batch_op.alter_column("user_id", existing_type=sa.Integer(), nullable=False)

View file

@ -41,6 +41,20 @@ def create_user(username: str, admin: bool) -> None:
click.echo(f"created user {username}")
@click.command("refresh-global-overlays")
def refresh_global_overlays_command() -> None:
from l4d2web.services.global_overlays import (
ensure_global_overlays,
enqueue_refresh_global_overlays,
)
with session_scope() as db:
ensure_global_overlays(db)
job = enqueue_refresh_global_overlays(db, user_id=None)
click.echo(f"queued refresh_global_overlays job #{job.id}")
def register_cli(app) -> None:
app.cli.add_command(promote_admin)
app.cli.add_command(create_user)
app.cli.add_command(refresh_global_overlays_command)

View file

@ -63,6 +63,69 @@ class Overlay(Base):
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class GlobalOverlaySource(Base):
__tablename__ = "global_overlay_sources"
__table_args__ = (Index("ix_global_overlay_sources_type", "source_type"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
overlay_id: Mapped[int] = mapped_column(
ForeignKey("overlays.id", ondelete="CASCADE"), unique=True, nullable=False
)
source_key: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
source_type: Mapped[str] = mapped_column(String(32), nullable=False)
source_url: Mapped[str] = mapped_column(Text, nullable=False)
last_manifest_hash: Mapped[str] = mapped_column(String(64), default="", nullable=False)
last_refreshed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
last_error: Mapped[str] = mapped_column(Text, default="", nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class GlobalOverlayItem(Base):
__tablename__ = "global_overlay_items"
__table_args__ = (
UniqueConstraint("source_id", "item_key", name="uq_global_overlay_item_source_key"),
Index("ix_global_overlay_items_source", "source_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
source_id: Mapped[int] = mapped_column(
ForeignKey("global_overlay_sources.id", ondelete="CASCADE"), nullable=False
)
item_key: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str] = mapped_column(String(255), default="", nullable=False)
download_url: Mapped[str] = mapped_column(Text, nullable=False)
expected_vpk_name: Mapped[str] = mapped_column(String(255), default="", nullable=False)
expected_size: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
expected_md5: Mapped[str] = mapped_column(String(32), default="", nullable=False)
etag: Mapped[str] = mapped_column(String(255), default="", nullable=False)
last_modified: Mapped[str] = mapped_column(String(255), default="", nullable=False)
content_length: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
last_downloaded_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
last_error: Mapped[str] = mapped_column(Text, default="", nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class GlobalOverlayItemFile(Base):
__tablename__ = "global_overlay_item_files"
__table_args__ = (
UniqueConstraint("item_id", "vpk_name", name="uq_global_overlay_item_file_name"),
Index("ix_global_overlay_item_files_item", "item_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
item_id: Mapped[int] = mapped_column(
ForeignKey("global_overlay_items.id", ondelete="CASCADE"), nullable=False
)
vpk_name: Mapped[str] = mapped_column(String(255), nullable=False)
cache_path: Mapped[str] = mapped_column(Text, nullable=False)
size: Mapped[int] = mapped_column(BigInteger, nullable=False)
md5: Mapped[str] = mapped_column(String(32), default="", nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=now_utc, nullable=False)
class WorkshopItem(Base):
__tablename__ = "workshop_items"
@ -139,7 +202,7 @@ class Job(Base):
__tablename__ = "jobs"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"), nullable=True)
server_id: Mapped[int | None] = mapped_column(ForeignKey("servers.id"), nullable=True)
overlay_id: Mapped[int | None] = mapped_column(ForeignKey("overlays.id"), nullable=True)
operation: Mapped[str] = mapped_column(String(32), nullable=False)

View file

@ -15,6 +15,7 @@ dependencies = [
"PyYAML>=6.0",
"gunicorn>=22.0",
"requests>=2.31",
"py7zr>=0.21",
]
[tool.setuptools]

View file

@ -6,7 +6,7 @@ from sqlalchemy import delete, func, select
from l4d2web.auth import current_user, require_login
from l4d2web.db import session_scope
from l4d2web.models import Blueprint as BlueprintModel
from l4d2web.models import BlueprintOverlay, Server
from l4d2web.models import BlueprintOverlay, Overlay, Server
bp = Blueprint("blueprint", __name__)
@ -37,6 +37,19 @@ def replace_blueprint_overlays(db, blueprint_id: int, overlay_ids: list[int]) ->
db.add(BlueprintOverlay(blueprint_id=blueprint_id, overlay_id=overlay_id, position=position))
def overlay_ids_authorized(db, overlay_ids: list[int], user_id: int) -> bool:
unique_ids = set(overlay_ids)
if not unique_ids:
return True
allowed_count = db.scalar(
select(func.count(Overlay.id)).where(
Overlay.id.in_(unique_ids),
Overlay.user_id.is_(None) | (Overlay.user_id == user_id),
)
)
return allowed_count == len(unique_ids)
@bp.post("/blueprints")
@require_login
def create_blueprint() -> Response:
@ -61,6 +74,8 @@ def create_blueprint() -> Response:
return Response("name is required", status=400)
with session_scope() as db:
if not overlay_ids_authorized(db, overlay_ids, user.id):
return Response("overlay not authorized", status=403)
blueprint = BlueprintModel(user_id=user.id, name=name, arguments=json.dumps(arguments), config=json.dumps(config))
db.add(blueprint)
db.flush()
@ -87,11 +102,14 @@ def update_blueprint_form(blueprint_id: int) -> Response:
)
if blueprint is None:
return Response(status=404)
overlay_ids = ordered_overlay_ids_from_form()
if not overlay_ids_authorized(db, overlay_ids, user.id):
return Response("overlay not authorized", status=403)
blueprint.name = name
blueprint.arguments = json.dumps(split_textarea_lines(request.form.get("arguments", "")))
blueprint.config = json.dumps(split_textarea_lines(request.form.get("config", "")))
replace_blueprint_overlays(db, blueprint.id, ordered_overlay_ids_from_form())
replace_blueprint_overlays(db, blueprint.id, overlay_ids)
return redirect(f"/blueprints/{blueprint_id}")

View file

@ -22,7 +22,11 @@ def format_sse_event(seq: int, event: str, data: str) -> str:
def can_access_job(job: Job, user: User) -> bool:
return user.admin or job.user_id == user.id
if user.admin:
return True
if job.user_id is None:
return False
return job.user_id == user.id
@bp.get("/jobs/<int:job_id>")
@ -34,7 +38,7 @@ def job_detail(job_id: int) -> str | Response:
with session_scope() as db:
row = db.execute(
select(Job, User, Server)
.join(User, User.id == Job.user_id)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.id == job_id)
).first()

View file

@ -8,6 +8,7 @@ from l4d2host.paths import get_left4me_root
from l4d2web.auth import current_user, require_login
from l4d2web.db import session_scope
from l4d2web.models import BlueprintOverlay, Overlay
from l4d2web.services.global_overlays import MANAGED_GLOBAL_OVERLAY_TYPES, is_creatable_overlay_type
from l4d2web.services.overlay_creation import (
create_overlay_directory,
generate_overlay_path,
@ -17,9 +18,6 @@ from l4d2web.services.overlay_creation import (
bp = Blueprint("overlay", __name__)
VALID_TYPES = {"external", "workshop"}
def _is_managed_path(overlay: Overlay) -> bool:
return overlay.path == str(overlay.id)
@ -27,6 +25,8 @@ def _is_managed_path(overlay: Overlay) -> bool:
def _can_edit_overlay(overlay: Overlay, user) -> bool:
if user is None:
return False
if overlay.type in MANAGED_GLOBAL_OVERLAY_TYPES:
return False
if user.admin:
return True
if overlay.type == "external":
@ -57,7 +57,7 @@ def create_overlay() -> Response:
overlay_type = request.form.get("type", "external").strip().lower()
if not name:
return Response("missing fields", status=400)
if overlay_type not in VALID_TYPES:
if not is_creatable_overlay_type(overlay_type, admin=user.admin):
return Response(f"unknown overlay type: {overlay_type}", status=400)
if overlay_type == "external":

View file

@ -8,6 +8,7 @@ from l4d2web.db import session_scope
from l4d2web.models import Blueprint as BlueprintModel
from l4d2web.models import (
BlueprintOverlay,
GlobalOverlaySource,
Job,
Overlay,
OverlayWorkshopItem,
@ -42,6 +43,22 @@ def enqueue_runtime_install() -> Response:
return redirect("/admin/jobs")
@bp.post("/admin/global-overlays/refresh")
@require_admin
def enqueue_global_overlay_refresh() -> Response:
user = current_user()
assert user is not None
from l4d2web.services.global_overlays import (
ensure_global_overlays,
enqueue_refresh_global_overlays,
)
with session_scope() as db:
ensure_global_overlays(db)
enqueue_refresh_global_overlays(db, user_id=user.id)
return redirect("/admin/jobs")
@bp.get("/admin/users")
@require_admin
def admin_users() -> str:
@ -56,7 +73,7 @@ def admin_jobs() -> str:
with session_scope() as db:
rows = db.execute(
select(Job, User, Server)
.join(User, User.id == Job.user_id)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.order_by(Job.created_at.desc())
).all()
@ -110,7 +127,7 @@ def server_detail(server_id: int):
blueprint = db.scalar(select(BlueprintModel).where(BlueprintModel.id == server.blueprint_id))
recent_job_rows = db.execute(
select(Job, User, Server)
.join(User, User.id == Job.user_id)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.server_id == server.id)
.order_by(Job.created_at.desc())
@ -137,7 +154,7 @@ def server_jobs_page(server_id: int):
return Response(status=404)
rows = db.execute(
select(Job, User, Server)
.join(User, User.id == Job.user_id)
.outerjoin(User, User.id == Job.user_id)
.outerjoin(Server, Server.id == Job.server_id)
.where(Job.server_id == server.id)
.order_by(Job.created_at.desc())
@ -155,7 +172,7 @@ def overlays() -> str:
query = select(Overlay).order_by(Overlay.name)
if not user.admin:
query = query.where(
(Overlay.type == "external") | (Overlay.user_id == user.id)
Overlay.user_id.is_(None) | (Overlay.user_id == user.id)
)
overlays = db.scalars(query).all()
return render_template("overlays.html", overlays=overlays)
@ -170,16 +187,20 @@ def overlay_detail(overlay_id: int):
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
if overlay is None:
return Response(status=404)
# Visibility: externals are visible to all; workshop overlays are
# visible to the owner and admins.
if overlay.type == "workshop" and not user.admin and overlay.user_id != user.id:
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
return Response(status=403)
using_blueprints = db.scalars(
global_source = db.scalar(
select(GlobalOverlaySource).where(GlobalOverlaySource.overlay_id == overlay.id)
)
using_blueprints_query = (
select(BlueprintModel)
.join(BlueprintOverlay, BlueprintOverlay.blueprint_id == BlueprintModel.id)
.where(BlueprintOverlay.overlay_id == overlay.id)
.order_by(BlueprintModel.name)
).all()
)
if not user.admin:
using_blueprints_query = using_blueprints_query.where(BlueprintModel.user_id == user.id)
using_blueprints = db.scalars(using_blueprints_query).all()
workshop_items = []
if overlay.type == "workshop":
workshop_items = db.scalars(
@ -200,6 +221,7 @@ def overlay_detail(overlay_id: int):
return render_template(
"overlay_detail.html",
overlay=overlay,
global_source=global_source,
using_blueprints=using_blueprints,
workshop_items=workshop_items,
latest_build_job=latest_build_job,
@ -241,7 +263,11 @@ def blueprint_page(blueprint_id: int):
select(BlueprintOverlay.overlay_id, BlueprintOverlay.position)
.where(BlueprintOverlay.blueprint_id == blueprint.id)
).all()
all_overlays = db.scalars(select(Overlay).order_by(Overlay.name)).all()
all_overlays = db.scalars(
select(Overlay)
.where(Overlay.user_id.is_(None) | (Overlay.user_id == user.id))
.order_by(Overlay.name)
).all()
overlay_positions = {overlay_id: position + 1 for overlay_id, position in position_rows}
return render_template(

View file

@ -0,0 +1,106 @@
from __future__ import annotations
import hashlib
import os
import shutil
from pathlib import Path
import tempfile
from zipfile import ZipFile
import py7zr
import requests
from l4d2host.paths import get_left4me_root
REQUEST_TIMEOUT_SECONDS = 30
DOWNLOAD_CHUNK_BYTES = 1_048_576
def global_overlay_cache_root() -> Path:
return get_left4me_root() / "global_overlay_cache"
def source_cache_root(source_key: str) -> Path:
if "/" in source_key or ".." in source_key or not source_key:
raise ValueError(f"invalid source_key: {source_key!r}")
return global_overlay_cache_root() / source_key
def archive_dir(source_key: str) -> Path:
return source_cache_root(source_key) / "archives"
def vpk_dir(source_key: str) -> Path:
return source_cache_root(source_key) / "vpks"
def download_archive(url: str, target: Path, *, should_cancel=None) -> tuple[str, str, int | None]:
target.parent.mkdir(parents=True, exist_ok=True)
partial = target.with_suffix(target.suffix + ".partial")
response = requests.get(url, stream=True, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
etag = response.headers.get("ETag", "")
last_modified = response.headers.get("Last-Modified", "")
content_length_raw = response.headers.get("Content-Length")
content_length = int(content_length_raw) if content_length_raw and content_length_raw.isdigit() else None
try:
with open(partial, "wb") as f:
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES):
if should_cancel is not None and should_cancel():
raise InterruptedError("download cancelled")
if chunk:
f.write(chunk)
os.replace(partial, target)
except BaseException:
partial.unlink(missing_ok=True)
raise
return etag, last_modified, content_length
def safe_extract_zip_vpks(archive_path: Path, output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
extracted: list[Path] = []
with ZipFile(archive_path) as zf:
for member in zf.infolist():
name = Path(member.filename)
if name.is_absolute() or any(part in {"", ".", ".."} for part in name.parts):
raise ValueError(f"unsafe archive member: {member.filename}")
if name.suffix.lower() != ".vpk":
continue
target = output_dir / name.name
with zf.open(member) as src, open(target, "wb") as dst:
shutil.copyfileobj(src, dst)
extracted.append(target)
if not extracted:
raise ValueError(f"archive {archive_path} did not contain any .vpk files")
return sorted(extracted)
def safe_extract_7z_vpks(archive_path: Path, output_dir: Path) -> list[Path]:
output_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="left4me-7z-") as raw_tmp:
raw_dir = Path(raw_tmp)
with py7zr.SevenZipFile(archive_path, mode="r") as archive:
names = archive.getnames()
for name in names:
p = Path(name)
if p.is_absolute() or any(part in {"", ".", ".."} for part in p.parts):
raise ValueError(f"unsafe archive member: {name}")
archive.extractall(path=raw_dir)
extracted: list[Path] = []
for candidate in raw_dir.rglob("*.vpk"):
target = output_dir / candidate.name
shutil.move(str(candidate), str(target))
extracted.append(target)
if not extracted:
raise ValueError(f"archive {archive_path} did not contain any .vpk files")
return sorted(extracted)
def extracted_vpk_md5(path: Path) -> str:
digest = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()

View file

@ -0,0 +1,104 @@
from __future__ import annotations
import csv
from dataclasses import dataclass
import hashlib
import html as html_lib
import io
import json
from urllib.parse import urljoin, urlparse
import re
import requests
REQUEST_TIMEOUT_SECONDS = 30
L4D2CENTER_CSV_URL = "https://l4d2center.com/maps/servers/index.csv"
CEDAPUG_CUSTOM_URL = "https://cedapug.com/custom"
@dataclass(frozen=True, slots=True)
class GlobalMapManifestItem:
item_key: str
display_name: str
download_url: str
expected_vpk_name: str = ""
expected_size: int | None = None
expected_md5: str = ""
def fetch_l4d2center_manifest() -> tuple[str, list[GlobalMapManifestItem]]:
response = requests.get(L4D2CENTER_CSV_URL, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
text = response.text
return _sha256(text), parse_l4d2center_csv(text)
def fetch_cedapug_manifest() -> tuple[str, list[GlobalMapManifestItem]]:
response = requests.get(CEDAPUG_CUSTOM_URL, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
text = response.text
return _sha256(text), parse_cedapug_custom_html(text)
def parse_l4d2center_csv(raw: str) -> list[GlobalMapManifestItem]:
reader = csv.DictReader(io.StringIO(raw), delimiter=";")
expected = ["Name", "Size", "md5", "Download link"]
if reader.fieldnames != expected:
raise ValueError("expected L4D2Center CSV header: Name;Size;md5;Download link")
items: list[GlobalMapManifestItem] = []
for row in reader:
name = (row.get("Name") or "").strip()
size_raw = (row.get("Size") or "").strip()
md5 = (row.get("md5") or "").strip().lower()
url = (row.get("Download link") or "").strip()
if not name or not url:
continue
items.append(
GlobalMapManifestItem(
item_key=name,
display_name=name,
download_url=url,
expected_vpk_name=name,
expected_size=int(size_raw) if size_raw else None,
expected_md5=md5,
)
)
return items
def parse_cedapug_custom_html(raw: str) -> list[GlobalMapManifestItem]:
match = re.search(r"renderCustomMapDownloads\((\[.*?\])\)</script>", raw, re.DOTALL)
if match is None:
raise ValueError("CEDAPUG page did not contain renderCustomMapDownloads data")
rows = json.loads(match.group(1))
items: list[GlobalMapManifestItem] = []
for row in rows:
if len(row) < 3:
continue
label = str(row[1])
link = str(row[2])
if link.startswith("http"):
continue
if not link:
continue
url = urljoin(CEDAPUG_CUSTOM_URL, link)
parsed = urlparse(url)
basename = parsed.path.rsplit("/", 1)[-1]
items.append(
GlobalMapManifestItem(
item_key=basename,
display_name=_strip_html(label),
download_url=url,
)
)
return items
def _strip_html(raw: str) -> str:
no_tags = re.sub(r"<[^>]+>", "", raw)
return html_lib.unescape(no_tags).strip()
def _sha256(raw: str) -> str:
return hashlib.sha256(raw.encode("utf-8")).hexdigest()

View file

@ -0,0 +1,168 @@
from __future__ import annotations
import shutil
from datetime import UTC, datetime
from pathlib import Path
import tempfile
from sqlalchemy import select
from l4d2web.db import session_scope
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource, Overlay
from l4d2web.services.global_map_cache import (
archive_dir,
download_archive,
extracted_vpk_md5,
safe_extract_7z_vpks,
safe_extract_zip_vpks,
vpk_dir,
)
from l4d2web.services.global_map_sources import (
GlobalMapManifestItem,
fetch_cedapug_manifest,
fetch_l4d2center_manifest,
)
from l4d2web.services.global_overlays import ensure_global_overlays
def refresh_global_overlays(*, on_stdout, on_stderr, should_cancel) -> list[str]:
with session_scope() as db:
ensure_global_overlays(db)
refreshed: list[str] = []
for source_key, fetcher in (
("l4d2center-maps", fetch_l4d2center_manifest),
("cedapug-maps", fetch_cedapug_manifest),
):
if should_cancel():
on_stderr("global overlay refresh cancelled before manifest fetch")
return refreshed
manifest_hash, manifest_items = fetcher()
on_stdout(f"{source_key}: fetched manifest with {len(manifest_items)} item(s)")
overlay = _refresh_source(
source_key,
manifest_hash,
manifest_items,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
build_global_overlay(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)
refreshed.append(source_key)
return sorted(refreshed)
def _refresh_source(source_key: str, manifest_hash: str, manifest_items: list[GlobalMapManifestItem], *, on_stdout, on_stderr, should_cancel) -> Overlay:
now = datetime.now(UTC)
desired_keys = {item.item_key for item in manifest_items}
with session_scope() as db:
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
if source is None:
raise ValueError(f"global overlay source {source_key!r} not found")
overlay = db.scalar(select(Overlay).where(Overlay.id == source.overlay_id))
if overlay is None:
raise ValueError(f"overlay for source {source_key!r} not found")
existing_items = {item.item_key: item for item in db.scalars(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id)).all()}
for old_key, old_item in list(existing_items.items()):
if old_key not in desired_keys:
db.delete(old_item)
for manifest_item in manifest_items:
item = existing_items.get(manifest_item.item_key)
if item is None:
item = GlobalOverlayItem(source_id=source.id, item_key=manifest_item.item_key, download_url=manifest_item.download_url)
db.add(item)
db.flush()
item.display_name = manifest_item.display_name
item.download_url = manifest_item.download_url
item.expected_vpk_name = manifest_item.expected_vpk_name
item.expected_size = manifest_item.expected_size
item.expected_md5 = manifest_item.expected_md5
item.updated_at = now
source.last_manifest_hash = manifest_hash
source.last_refreshed_at = now
source.last_error = ""
source.updated_at = now
db.expunge(overlay)
for manifest_item in manifest_items:
if should_cancel():
on_stderr(f"{source_key}: refresh cancelled during downloads")
return overlay
_refresh_item(source_key, manifest_item, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)
return overlay
def _refresh_item(source_key: str, manifest_item: GlobalMapManifestItem, *, on_stdout, on_stderr, should_cancel) -> None:
try:
files, etag, last_modified, content_length = download_and_extract_item(source_key, manifest_item, should_cancel=should_cancel)
except Exception as exc:
with session_scope() as db:
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
if source is not None:
item = db.scalar(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id, GlobalOverlayItem.item_key == manifest_item.item_key))
if item is not None:
item.last_error = str(exc)
on_stderr(f"{source_key}: {manifest_item.item_key}: {exc}")
return
now = datetime.now(UTC)
with session_scope() as db:
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == source_key))
if source is None:
raise ValueError(f"global overlay source {source_key!r} not found")
item = db.scalar(select(GlobalOverlayItem).where(GlobalOverlayItem.source_id == source.id, GlobalOverlayItem.item_key == manifest_item.item_key))
if item is None:
raise ValueError(f"global overlay item {manifest_item.item_key!r} not found")
db.query(GlobalOverlayItemFile).filter_by(item_id=item.id).delete()
for vpk_name, cache_path, size, md5 in files:
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name=vpk_name, cache_path=cache_path, size=size, md5=md5))
item.etag = etag
item.last_modified = last_modified
item.content_length = content_length
item.last_downloaded_at = now
item.last_error = ""
item.updated_at = now
on_stdout(f"{source_key}: refreshed {manifest_item.item_key} ({len(files)} vpk file(s))")
def download_and_extract_item(source_key: str, item: GlobalMapManifestItem, *, should_cancel) -> tuple[list[tuple[str, str, int, str]], str, str, int | None]:
archives = archive_dir(source_key)
vpks = vpk_dir(source_key)
archives.mkdir(parents=True, exist_ok=True)
vpks.mkdir(parents=True, exist_ok=True)
archive_name = item.download_url.rsplit("/", 1)[-1]
archive_path = archives / archive_name
etag, last_modified, content_length = download_archive(item.download_url, archive_path, should_cancel=should_cancel)
with tempfile.TemporaryDirectory(prefix="left4me-global-map-") as tmp:
tmp_dir = Path(tmp)
if archive_name.lower().endswith(".7z"):
extracted = safe_extract_7z_vpks(archive_path, tmp_dir)
elif archive_name.lower().endswith(".zip"):
extracted = safe_extract_zip_vpks(archive_path, tmp_dir)
else:
raise ValueError(f"unsupported archive extension for {archive_name}")
results: list[tuple[str, str, int, str]] = []
for path in extracted:
if item.expected_vpk_name and path.name != item.expected_vpk_name:
continue
size = path.stat().st_size
md5 = extracted_vpk_md5(path)
if item.expected_size is not None and size != item.expected_size:
raise ValueError(f"{path.name} size mismatch: expected {item.expected_size}, got {size}")
if item.expected_md5 and md5 != item.expected_md5:
raise ValueError(f"{path.name} md5 mismatch: expected {item.expected_md5}, got {md5}")
final = vpks / path.name
shutil.move(str(path), str(final))
results.append((path.name, f"{source_key}/vpks/{path.name}", size, md5))
if not results:
raise ValueError(f"no expected .vpk files extracted from {archive_name}")
return results, etag, last_modified, content_length
def build_global_overlay(overlay: Overlay, *, on_stdout, on_stderr, should_cancel) -> None:
from l4d2web.services.overlay_builders import BUILDERS
builder = BUILDERS.get(overlay.type)
if builder is None:
raise ValueError(f"no builder registered for overlay type {overlay.type!r}")
builder.build(overlay, on_stdout=on_stdout, on_stderr=on_stderr, should_cancel=should_cancel)

View file

@ -0,0 +1,112 @@
from __future__ import annotations
from dataclasses import dataclass
import os
from sqlalchemy import select
from sqlalchemy.orm import Session
from l4d2host.paths import get_left4me_root
from l4d2web.models import GlobalOverlaySource, Job, Overlay
from l4d2web.services.overlay_creation import generate_overlay_path
@dataclass(frozen=True)
class ManagedGlobalOverlay:
name: str
overlay_type: str
source_type: str
source_url: str
GLOBAL_OVERLAYS = (
ManagedGlobalOverlay(
name="l4d2center-maps",
overlay_type="l4d2center_maps",
source_type="l4d2center_csv",
source_url="https://l4d2center.com/maps/servers/index.csv",
),
ManagedGlobalOverlay(
name="cedapug-maps",
overlay_type="cedapug_maps",
source_type="cedapug_custom_page",
source_url="https://cedapug.com/custom",
),
)
MANAGED_GLOBAL_OVERLAY_TYPES = {overlay.overlay_type for overlay in GLOBAL_OVERLAYS}
USER_CREATABLE_TYPES = {"workshop"}
ADMIN_CREATABLE_TYPES = {"external", "workshop"}
def is_creatable_overlay_type(overlay_type: str, *, admin: bool) -> bool:
allowed = ADMIN_CREATABLE_TYPES if admin else USER_CREATABLE_TYPES
return overlay_type in allowed
def ensure_global_overlays(session: Session) -> set[str]:
created_sources: set[str] = set()
for managed in GLOBAL_OVERLAYS:
overlay = session.scalar(
select(Overlay).where(Overlay.name == managed.name, Overlay.user_id.is_(None))
)
overlay_created = overlay is None
if overlay is None:
overlay = Overlay(name=managed.name, path="", type=managed.overlay_type, user_id=None)
session.add(overlay)
session.flush()
overlay.path = generate_overlay_path(overlay.id)
else:
overlay.type = managed.overlay_type
overlay.user_id = None
if not overlay.path:
overlay.path = generate_overlay_path(overlay.id)
target = get_left4me_root() / "overlays" / overlay.path
os.makedirs(target, exist_ok=not overlay_created)
source = session.scalar(
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == managed.name)
)
if source is None:
source = GlobalOverlaySource(
overlay_id=overlay.id,
source_key=managed.name,
source_type=managed.source_type,
source_url=managed.source_url,
)
session.add(source)
created_sources.add(managed.name)
else:
source.overlay_id = overlay.id
source.source_type = managed.source_type
source.source_url = managed.source_url
session.flush()
return created_sources
def enqueue_refresh_global_overlays(session: Session, *, user_id: int | None) -> Job:
existing = session.scalar(
select(Job)
.where(
Job.operation == "refresh_global_overlays",
Job.state.in_({"queued", "running", "cancelling"}),
)
.order_by(Job.created_at, Job.id)
)
if existing is not None:
return existing
job = Job(
user_id=user_id,
server_id=None,
overlay_id=None,
operation="refresh_global_overlays",
state="queued",
)
session.add(job)
session.flush()
return job

View file

@ -27,7 +27,7 @@ TERMINAL_JOB_STATES = {"succeeded", "failed", "cancelled"}
ACTIVE_JOB_STATES = {"running", "cancelling"}
SERVER_OPERATIONS = {"initialize", "start", "stop", "delete"}
OVERLAY_OPERATIONS = {"build_overlay"}
GLOBAL_OPERATIONS = {"install", "refresh_workshop_items"}
GLOBAL_OPERATIONS = {"install", "refresh_workshop_items", "refresh_global_overlays"}
WORKSHOP_REFRESH_DOWNLOAD_WORKERS = 1
_claim_lock = threading.Lock()
@ -40,6 +40,7 @@ _workers_started = False
class SchedulerState:
install_running: bool = False
refresh_running: bool = False
refresh_global_overlays_running: bool = False
running_servers: set[int] = field(default_factory=set)
running_overlays: set[int] = field(default_factory=set)
blocked_servers_by_overlay: set[int] = field(default_factory=set)
@ -62,6 +63,7 @@ def can_start(job, state: SchedulerState) -> bool:
return (
not state.install_running
and not state.refresh_running
and not state.refresh_global_overlays_running
and len(state.running_servers) == 0
and len(state.running_overlays) == 0
)
@ -69,17 +71,26 @@ def can_start(job, state: SchedulerState) -> bool:
return (
not state.install_running
and not state.refresh_running
and not state.refresh_global_overlays_running
and len(state.running_servers) == 0
and len(state.running_overlays) == 0
)
if job.operation == "refresh_global_overlays":
return (
not state.install_running
and not state.refresh_running
and not state.refresh_global_overlays_running
and len(state.running_servers) == 0
and len(state.running_overlays) == 0
)
if job.operation == "build_overlay":
if state.install_running or state.refresh_running:
if state.install_running or state.refresh_running or state.refresh_global_overlays_running:
return False
if job.overlay_id is None:
return False
return job.overlay_id not in state.running_overlays
# Server operations from here on.
if state.install_running or state.refresh_running:
if state.install_running or state.refresh_running or state.refresh_global_overlays_running:
return False
if job.server_id is None:
return False
@ -98,6 +109,8 @@ def build_scheduler_state(session: Session) -> SchedulerState:
state.install_running = True
elif job.operation == "refresh_workshop_items":
state.refresh_running = True
elif job.operation == "refresh_global_overlays":
state.refresh_global_overlays_running = True
elif job.operation == "build_overlay" and job.overlay_id is not None:
state.running_overlays.add(job.overlay_id)
elif job.server_id is not None:
@ -247,6 +260,15 @@ def run_job(job_id: int) -> None:
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "refresh_global_overlays":
_run_with_boundaries(
"refresh",
"global overlays",
_run_refresh_global_overlays,
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
elif operation == "build_overlay":
if overlay_id_for_job is None:
raise ValueError("build_overlay job has no overlay_id")
@ -368,6 +390,21 @@ def _run_build_overlay(
)
def _run_refresh_global_overlays(
*,
on_stdout: Callable[[str], None],
on_stderr: Callable[[str], None],
should_cancel: Callable[[], bool],
) -> list[str]:
from l4d2web.services.global_overlay_refresh import refresh_global_overlays
return refresh_global_overlays(
on_stdout=on_stdout,
on_stderr=on_stderr,
should_cancel=should_cancel,
)
def _run_refresh_workshop_items(
*,
on_stdout: Callable[[str], None],

View file

@ -8,12 +8,16 @@ from l4d2web.db import session_scope
from l4d2web.models import (
Blueprint,
BlueprintOverlay,
GlobalOverlayItem,
GlobalOverlayItemFile,
GlobalOverlaySource,
Overlay,
OverlayWorkshopItem,
Server,
WorkshopItem,
)
from l4d2web.services import host_commands
from l4d2web.services.global_map_cache import global_overlay_cache_root
from l4d2web.services.spec_yaml import write_temp_spec
from l4d2web.services.workshop_paths import cache_path
@ -79,6 +83,7 @@ def initialize_server(server_id: int, on_stdout=None, on_stderr=None, should_can
# them, but we don't want to mount a partial overlay silently — fail
# loudly with the missing IDs.
_check_workshop_overlay_caches(blueprint_id=blueprint.id)
_check_global_overlay_caches(blueprint_id=blueprint.id)
spec_path = write_temp_spec(build_server_spec_payload(server, blueprint, overlay_refs))
try:
@ -173,6 +178,36 @@ def _check_workshop_overlay_caches(*, blueprint_id: int) -> None:
)
def _check_global_overlay_caches(*, blueprint_id: int) -> None:
"""Raise if any global map overlay attached to this blueprint has manifest
items that aren't yet in the global_overlay_cache. Mirrors the workshop
cache check surface partial cache state at initialize time.
"""
with session_scope() as db:
rows = db.execute(
select(Overlay.name, GlobalOverlayItemFile.vpk_name, GlobalOverlayItemFile.cache_path)
.join(BlueprintOverlay, BlueprintOverlay.overlay_id == Overlay.id)
.join(GlobalOverlaySource, GlobalOverlaySource.overlay_id == Overlay.id)
.join(GlobalOverlayItem, GlobalOverlayItem.source_id == GlobalOverlaySource.id)
.join(GlobalOverlayItemFile, GlobalOverlayItemFile.item_id == GlobalOverlayItem.id)
.where(BlueprintOverlay.blueprint_id == blueprint_id)
).all()
missing: dict[str, list[str]] = {}
root = global_overlay_cache_root()
for overlay_name, vpk_name, cache_path_value in rows:
if not (root / cache_path_value).exists():
missing.setdefault(overlay_name, []).append(vpk_name)
if not missing:
return
details = []
for overlay_name, names in sorted(missing.items()):
details.append(f"overlay {overlay_name!r}: missing {', '.join(sorted(names))}")
raise RuntimeError("global overlay content missing — " + "; ".join(details))
def start_server(server_id: int, on_stdout=None, on_stderr=None, should_cancel=None) -> None:
server, _, _ = load_server_blueprint_bundle(server_id)
host_commands.run_command(

View file

@ -16,7 +16,8 @@ from sqlalchemy import select
from l4d2host.paths import get_left4me_root
from l4d2web.db import session_scope
from l4d2web.models import Overlay, OverlayWorkshopItem, WorkshopItem
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource, Overlay, OverlayWorkshopItem, WorkshopItem
from l4d2web.services.global_map_cache import global_overlay_cache_root
from l4d2web.services.workshop_paths import cache_path, workshop_cache_root
@ -179,6 +180,97 @@ class WorkshopBuilder:
)
class GlobalMapOverlayBuilder:
"""Reconcile symlinks for managed global map overlays."""
def build(
self,
overlay: Overlay,
*,
on_stdout: LogSink,
on_stderr: LogSink,
should_cancel: CancelCheck,
) -> None:
addons_dir = _overlay_root(overlay) / "left4dead2" / "addons"
addons_dir.mkdir(parents=True, exist_ok=True)
with session_scope() as db:
source = db.scalar(select(GlobalOverlaySource).where(GlobalOverlaySource.overlay_id == overlay.id))
if source is None:
raise ValueError(f"global overlay source for overlay {overlay.id} not found")
rows = db.execute(
select(GlobalOverlayItemFile.vpk_name, GlobalOverlayItemFile.cache_path)
.join(GlobalOverlayItem, GlobalOverlayItem.id == GlobalOverlayItemFile.item_id)
.where(GlobalOverlayItem.source_id == source.id)
).all()
source_key = source.source_key
cache_root = global_overlay_cache_root().resolve()
source_vpk_root = (global_overlay_cache_root() / source_key / "vpks").resolve()
desired: dict[str, Path] = {}
skipped = 0
for vpk_name, cache_path_value in rows:
target = (global_overlay_cache_root() / cache_path_value).resolve()
if not _is_under(target, source_vpk_root) or not target.exists():
on_stderr(f"global overlay {overlay.name!r}: missing cache file for {vpk_name}")
skipped += 1
continue
desired[vpk_name] = target
existing: dict[str, Path] = {}
for entry in os.scandir(addons_dir):
if not entry.is_symlink():
continue
try:
resolved = Path(os.readlink(entry.path)).resolve(strict=False)
except OSError:
continue
if _is_under(resolved, source_vpk_root):
existing[entry.name] = resolved
elif _is_under(resolved, cache_root):
on_stderr(f"global overlay {overlay.name!r}: leaving foreign cache symlink {entry.name}")
created = 0
removed = 0
unchanged = 0
for name, current_target in existing.items():
if should_cancel():
on_stderr("global overlay build cancelled mid-removal")
return
desired_target = desired.get(name)
if desired_target is None:
os.unlink(addons_dir / name)
removed += 1
elif current_target == desired_target:
unchanged += 1
else:
os.unlink(addons_dir / name)
current_names = {
name for name, current_target in existing.items() if name in desired and current_target == desired[name]
}
for name, target in desired.items():
if should_cancel():
on_stderr("global overlay build cancelled mid-creation")
return
if name in current_names:
continue
link_path = addons_dir / name
if link_path.exists() and not link_path.is_symlink():
on_stderr(f"refusing to overwrite non-symlink at {link_path}")
continue
if link_path.is_symlink():
on_stderr(f"refusing to overwrite foreign symlink at {link_path}")
continue
os.symlink(str(target), str(link_path))
created += 1
on_stdout(
f"global overlay {overlay.name!r}: created={created} removed={removed} "
f"unchanged={unchanged} skipped(missing)={skipped}"
)
def _is_under(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
@ -190,4 +282,6 @@ def _is_under(path: Path, root: Path) -> bool:
BUILDERS: dict[str, OverlayBuilder] = {
"external": ExternalBuilder(),
"workshop": WorkshopBuilder(),
"l4d2center_maps": GlobalMapOverlayBuilder(),
"cedapug_maps": GlobalMapOverlayBuilder(),
}

View file

@ -17,7 +17,7 @@
<td><a href="/jobs/{{ job.id }}">#{{ job.id }}</a></td>
<td>{{ job.operation }}</td>
<td>{{ job.state }}</td>
{% if show_user %}<td>{{ user.username }}</td>{% endif %}
{% if show_user %}<td>{{ user.username if user else "system" }}</td>{% endif %}
{% if show_server %}<td>{% if server %}<a href="/servers/{{ server.id }}">{{ server.name }}</a>{% else %}-{% endif %}</td>{% endif %}
<td>{{ job.created_at }}</td>
<td>{{ job.finished_at or "-" }}</td>

View file

@ -28,4 +28,13 @@
<button type="submit">Refresh all workshop items</button>
</form>
</section>
<section class="panel">
<h2>Global map overlays</h2>
<p class="muted">Queue a refresh for managed L4D2Center and CEDAPUG map overlays.</p>
<form method="post" action="/admin/global-overlays/refresh">
<input type="hidden" name="csrf_token" value="{{ session.get('csrf_token', '') }}">
<button type="submit">Refresh global overlays</button>
</form>
</section>
{% endblock %}

View file

@ -19,7 +19,7 @@
<tbody>
<tr><th>Operation</th><td>{{ job.operation }}</td></tr>
<tr><th>State</th><td>{{ job.state }}</td></tr>
<tr><th>User</th><td>{{ owner.username }}</td></tr>
<tr><th>User</th><td>{{ owner.username if owner else "system" }}</td></tr>
<tr><th>Server</th><td>{% if server %}<a href="/servers/{{ server.id }}">{{ server.name }}</a>{% else %}-{% endif %}</td></tr>
<tr><th>Created</th><td>{{ job.created_at }}</td></tr>
<tr><th>Started</th><td>{{ job.started_at or "-" }}</td></tr>

View file

@ -6,7 +6,7 @@
<section class="panel">
<div class="page-heading">
<h1>Overlay: {{ overlay.name }}</h1>
{% set can_edit = g.user.admin or (overlay.type == 'workshop' and overlay.user_id == g.user.id) %}
{% set can_edit = overlay.type not in ['l4d2center_maps', 'cedapug_maps'] and (g.user.admin or (overlay.type == 'workshop' and overlay.user_id == g.user.id)) %}
{% if can_edit %}
<button type="button" class="danger" data-modal-open="delete-overlay-modal">Delete</button>
{% endif %}
@ -31,6 +31,20 @@
</table>
</section>
{% if global_source %}
<section class="panel">
<h2>Global source</h2>
<table class="definition-table">
<tbody>
<tr><th>Source key</th><td>{{ global_source.source_key }}</td></tr>
<tr><th>Source URL</th><td><a href="{{ global_source.source_url }}">{{ global_source.source_url }}</a></td></tr>
<tr><th>Last refreshed</th><td>{{ global_source.last_refreshed_at or "Never" }}</td></tr>
<tr><th>Last error</th><td>{{ global_source.last_error or "None" }}</td></tr>
</tbody>
</table>
</section>
{% endif %}
{% if overlay.type == 'workshop' %}
<section class="panel">
<div class="page-heading">

View file

@ -79,6 +79,86 @@ def test_user_can_create_private_blueprint(user_client) -> None:
assert response.status_code == 201
def _create_other_users_private_overlay() -> int:
with session_scope() as session:
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
session.add(other)
session.flush()
overlay = Overlay(
name="mallory-private",
path="mallory-private",
type="workshop",
user_id=other.id,
)
session.add(overlay)
session.flush()
return overlay.id
def test_user_cannot_create_blueprint_with_other_users_private_overlay(user_client) -> None:
foreign_overlay_id = _create_other_users_private_overlay()
payload = {
"name": "bad",
"arguments": [],
"config": [],
"overlay_ids": [foreign_overlay_id],
}
response = user_client.post(
"/blueprints",
data=json.dumps(payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
def test_user_cannot_update_blueprint_with_other_users_private_overlay(user_client) -> None:
foreign_overlay_id = _create_other_users_private_overlay()
create = user_client.post(
"/blueprints",
data={"name": "comp", "arguments": "", "config": "", "overlay_ids": ["1"]},
headers={"X-CSRF-Token": "test-token"},
)
assert create.status_code == 302
response = user_client.post(
"/blueprints/1",
data={
"name": "edited",
"arguments": "",
"config": "",
"overlay_ids": [str(foreign_overlay_id)],
},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
def test_user_can_create_blueprint_with_system_overlay(user_client) -> None:
payload = {
"name": "system-ok",
"arguments": [],
"config": [],
"overlay_ids": [1],
}
response = user_client.post(
"/blueprints",
data=json.dumps(payload),
content_type="application/json",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 201
blueprint_id = response.get_json()["id"]
with session_scope() as session:
link = session.query(BlueprintOverlay).filter_by(blueprint_id=blueprint_id, overlay_id=1).one()
assert link.position == 0
def test_delete_blueprint_blocked_when_in_use(linked_blueprint) -> None:
client, blueprint_id = linked_blueprint
response = client.delete(f"/blueprints/{blueprint_id}", headers={"X-CSRF-Token": "test-token"})

View file

@ -0,0 +1,49 @@
from pathlib import Path
from zipfile import ZipFile
from l4d2web.services.global_map_cache import (
extracted_vpk_md5,
global_overlay_cache_root,
safe_extract_zip_vpks,
source_cache_root,
)
def test_global_overlay_cache_paths(tmp_path, monkeypatch):
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
assert global_overlay_cache_root() == tmp_path / "global_overlay_cache"
assert source_cache_root("l4d2center-maps") == tmp_path / "global_overlay_cache" / "l4d2center-maps"
def test_safe_extract_zip_vpks_extracts_only_vpks(tmp_path):
archive = tmp_path / "maps.zip"
with ZipFile(archive, "w") as zf:
zf.writestr("FatalFreight.vpk", b"vpk-bytes")
zf.writestr("readme.txt", b"ignore")
out_dir = tmp_path / "out"
files = safe_extract_zip_vpks(archive, out_dir)
assert files == [out_dir / "FatalFreight.vpk"]
assert (out_dir / "FatalFreight.vpk").read_bytes() == b"vpk-bytes"
assert not (out_dir / "readme.txt").exists()
def test_safe_extract_zip_vpks_rejects_path_traversal(tmp_path):
archive = tmp_path / "bad.zip"
with ZipFile(archive, "w") as zf:
zf.writestr("../evil.vpk", b"bad")
try:
safe_extract_zip_vpks(archive, tmp_path / "out")
except ValueError as exc:
assert "unsafe archive member" in str(exc)
else:
raise AssertionError("path traversal must fail")
def test_extracted_vpk_md5(tmp_path):
p = tmp_path / "x.vpk"
p.write_bytes(b"abc")
assert extracted_vpk_md5(p) == "900150983cd24fb0d6963f7d28e17f72"

View file

@ -0,0 +1,65 @@
from l4d2web.services.global_map_sources import (
GlobalMapManifestItem,
parse_cedapug_custom_html,
parse_l4d2center_csv,
)
def test_parse_l4d2center_csv_semicolon_manifest():
raw = """Name;Size;md5;Download link
carriedoff.vpk;128660532;0380e12c57156574e17a96da1252cf21;https://l4d2center.com/maps/servers/carriedoff.7z
"""
items = parse_l4d2center_csv(raw)
assert items == [
GlobalMapManifestItem(
item_key="carriedoff.vpk",
display_name="carriedoff.vpk",
download_url="https://l4d2center.com/maps/servers/carriedoff.7z",
expected_vpk_name="carriedoff.vpk",
expected_size=128660532,
expected_md5="0380e12c57156574e17a96da1252cf21",
)
]
def test_parse_l4d2center_rejects_missing_header():
try:
parse_l4d2center_csv("bad,data\n")
except ValueError as exc:
assert "Name;Size;md5;Download link" in str(exc)
else:
raise AssertionError("bad header must fail")
def test_parse_cedapug_custom_html_extracts_relative_zip_links():
html = """
<script>renderCustomMapDownloads([
["c1m1_hotel","<span style='color: #977d4c;'>Dead Center<\\/span>"],
["l4d2_ff01_woods","<span style='color: #854C34;'>Fatal Freight<\\/span>","\\/maps\\/FatalFreight.zip"],
["external","External","https://steamcommunity.com/sharedfiles/filedetails/?id=123"]
])</script>
"""
items = parse_cedapug_custom_html(html)
assert items == [
GlobalMapManifestItem(
item_key="FatalFreight.zip",
display_name="Fatal Freight",
download_url="https://cedapug.com/maps/FatalFreight.zip",
expected_vpk_name="",
expected_size=None,
expected_md5="",
)
]
def test_parse_cedapug_custom_html_rejects_missing_data():
try:
parse_cedapug_custom_html("<html></html>")
except ValueError as exc:
assert "renderCustomMapDownloads" in str(exc)
else:
raise AssertionError("missing embedded data must fail")

View file

@ -0,0 +1,89 @@
import os
from pathlib import Path
from l4d2web.db import init_db, session_scope
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource, Overlay
from l4d2web.services.overlay_builders import BUILDERS
def seed_source(tmp_path: Path, monkeypatch) -> int:
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'builder.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
cache_vpk = tmp_path / "global_overlay_cache" / "l4d2center-maps" / "vpks" / "carriedoff.vpk"
cache_vpk.parent.mkdir(parents=True, exist_ok=True)
cache_vpk.write_bytes(b"vpk")
with session_scope() as db:
overlay = Overlay(name="l4d2center-maps", path="7", type="l4d2center_maps", user_id=None)
db.add(overlay)
db.flush()
source = GlobalOverlaySource(
overlay_id=overlay.id,
source_key="l4d2center-maps",
source_type="l4d2center_csv",
source_url="https://l4d2center.com/maps/servers/index.csv",
)
db.add(source)
db.flush()
item = GlobalOverlayItem(
source_id=source.id,
item_key="carriedoff.vpk",
display_name="carriedoff.vpk",
download_url="https://example.invalid/carriedoff.7z",
expected_vpk_name="carriedoff.vpk",
)
db.add(item)
db.flush()
db.add(
GlobalOverlayItemFile(
item_id=item.id,
vpk_name="carriedoff.vpk",
cache_path="l4d2center-maps/vpks/carriedoff.vpk",
size=3,
md5="",
)
)
db.flush()
return overlay.id
def test_registry_contains_global_map_builders():
assert "l4d2center_maps" in BUILDERS
assert "cedapug_maps" in BUILDERS
def test_global_builder_creates_absolute_symlink(tmp_path, monkeypatch):
overlay_id = seed_source(tmp_path, monkeypatch)
out: list[str] = []
err: list[str] = []
with session_scope() as db:
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=out.append, on_stderr=err.append, should_cancel=lambda: False)
link = tmp_path / "overlays" / "7" / "left4dead2" / "addons" / "carriedoff.vpk"
assert link.is_symlink()
assert os.path.isabs(os.readlink(link))
assert link.resolve() == (tmp_path / "global_overlay_cache" / "l4d2center-maps" / "vpks" / "carriedoff.vpk").resolve()
assert any("global overlay" in line for line in out)
def test_global_builder_removes_obsolete_managed_symlink_but_keeps_foreign(tmp_path, monkeypatch):
overlay_id = seed_source(tmp_path, monkeypatch)
addons = tmp_path / "overlays" / "7" / "left4dead2" / "addons"
addons.mkdir(parents=True, exist_ok=True)
foreign_target = tmp_path / "foreign.vpk"
foreign_target.write_bytes(b"foreign")
os.symlink(str(foreign_target), addons / "foreign.vpk")
with session_scope() as db:
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
source = db.query(GlobalOverlaySource).filter_by(source_key="l4d2center-maps").one()
db.query(GlobalOverlayItem).filter_by(source_id=source.id).delete()
with session_scope() as db:
overlay = db.query(Overlay).filter_by(id=overlay_id).one()
BUILDERS["l4d2center_maps"].build(overlay, on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
assert not (addons / "carriedoff.vpk").exists()
assert (addons / "foreign.vpk").is_symlink()

View file

@ -0,0 +1,19 @@
from l4d2web.app import create_app
from l4d2web.db import init_db, session_scope
from l4d2web.models import Job
def test_refresh_global_overlays_cli_enqueues_system_job(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'cli.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
result = app.test_cli_runner().invoke(args=["refresh-global-overlays"])
assert result.exit_code == 0
assert "queued refresh_global_overlays job" in result.output
with session_scope() as db:
job = db.query(Job).filter_by(operation="refresh_global_overlays").one()
assert job.user_id is None

View file

@ -0,0 +1,154 @@
from sqlalchemy.exc import IntegrityError
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
GlobalOverlayItem,
GlobalOverlayItemFile,
GlobalOverlaySource,
Job,
Overlay,
User,
)
def test_system_job_allows_null_user_id(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'models.db'}")
init_db()
with session_scope() as db:
job = Job(
user_id=None,
server_id=None,
overlay_id=None,
operation="refresh_global_overlays",
)
db.add(job)
db.flush()
assert job.id is not None
assert job.user_id is None
def test_global_overlay_source_uniqueness(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'sources.db'}")
init_db()
with session_scope() as db:
overlay = Overlay(
name="l4d2center-maps", path="1", type="l4d2center_maps", user_id=None
)
db.add(overlay)
db.flush()
db.add(
GlobalOverlaySource(
overlay_id=overlay.id,
source_key="l4d2center-maps",
source_type="l4d2center_csv",
source_url="https://l4d2center.com/maps/servers/index.csv",
)
)
try:
with session_scope() as db:
other = Overlay(
name="cedapug-maps", path="2", type="cedapug_maps", user_id=None
)
db.add(other)
db.flush()
db.add(
GlobalOverlaySource(
overlay_id=other.id,
source_key="l4d2center-maps",
source_type="l4d2center_csv",
source_url="https://example.invalid/duplicate",
)
)
except IntegrityError:
pass
else:
raise AssertionError("duplicate source_key must fail")
def test_global_overlay_items_and_files_are_unique_per_parent(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'items.db'}")
init_db()
with session_scope() as db:
overlay = Overlay(name="cedapug-maps", path="1", type="cedapug_maps", user_id=None)
db.add(overlay)
db.flush()
source = GlobalOverlaySource(
overlay_id=overlay.id,
source_key="cedapug-maps",
source_type="cedapug_custom_page",
source_url="https://cedapug.com/custom",
)
db.add(source)
db.flush()
item = GlobalOverlayItem(
source_id=source.id,
item_key="FatalFreight.zip",
display_name="Fatal Freight",
download_url="https://cedapug.com/maps/FatalFreight.zip",
expected_vpk_name="FatalFreight.vpk",
)
db.add(item)
db.flush()
db.add(
GlobalOverlayItemFile(
item_id=item.id,
vpk_name="FatalFreight.vpk",
cache_path="cedapug-maps/vpks/FatalFreight.vpk",
size=123,
md5="",
)
)
item_id = item.id
try:
with session_scope() as db:
source = db.query(GlobalOverlaySource).filter_by(source_key="cedapug-maps").one()
db.add(
GlobalOverlayItem(
source_id=source.id,
item_key="FatalFreight.zip",
display_name="Fatal Freight duplicate",
download_url="https://cedapug.com/maps/FatalFreight.zip",
expected_vpk_name="FatalFreight.vpk",
)
)
except IntegrityError:
pass
else:
raise AssertionError("duplicate item_key per source must fail")
try:
with session_scope() as db:
db.add(
GlobalOverlayItemFile(
item_id=item_id,
vpk_name="FatalFreight.vpk",
cache_path="cedapug-maps/vpks/FatalFreight-copy.vpk",
size=456,
md5="",
)
)
except IntegrityError:
pass
else:
raise AssertionError("duplicate vpk_name per item must fail")
def test_normal_user_rows_still_require_real_users(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'users.db'}")
init_db()
with session_scope() as db:
user = User(username="alice", password_digest="digest", admin=False)
db.add(user)
db.flush()
job = Job(user_id=user.id, server_id=None, operation="install", state="queued")
db.add(job)
db.flush()
assert job.id is not None
assert job.user_id == user.id

View file

@ -0,0 +1,69 @@
from pathlib import Path
from l4d2web.db import init_db, session_scope
from l4d2web.models import GlobalOverlayItem, GlobalOverlayItemFile, GlobalOverlaySource
from l4d2web.services.global_map_sources import GlobalMapManifestItem
def test_refresh_global_overlays_updates_manifest_items_and_invokes_builders(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
from l4d2web.services import global_overlay_refresh
monkeypatch.setattr(
global_overlay_refresh,
"fetch_l4d2center_manifest",
lambda: ("hash-center", [GlobalMapManifestItem("carriedoff.vpk", "carriedoff.vpk", "https://example.invalid/carriedoff.7z", "carriedoff.vpk", 3, "" )]),
)
monkeypatch.setattr(
global_overlay_refresh,
"fetch_cedapug_manifest",
lambda: ("hash-ceda", [GlobalMapManifestItem("FatalFreight.zip", "Fatal Freight", "https://example.invalid/FatalFreight.zip")]),
)
def fake_download_and_extract(source_key, item, *, should_cancel):
target = tmp_path / "global_overlay_cache" / source_key / "vpks" / (item.expected_vpk_name or item.item_key.replace(".zip", ".vpk"))
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(b"vpk")
return [(target.name, f"{source_key}/vpks/{target.name}", 3, "")], "etag", "last-modified", 3
built: list[str] = []
monkeypatch.setattr(global_overlay_refresh, "download_and_extract_item", fake_download_and_extract)
monkeypatch.setattr(global_overlay_refresh, "build_global_overlay", lambda overlay, **kwargs: built.append(overlay.name))
out: list[str] = []
result = global_overlay_refresh.refresh_global_overlays(on_stdout=out.append, on_stderr=out.append, should_cancel=lambda: False)
assert result == ["cedapug-maps", "l4d2center-maps"]
assert set(built) == {"cedapug-maps", "l4d2center-maps"}
with session_scope() as db:
assert db.query(GlobalOverlaySource).count() == 2
assert db.query(GlobalOverlayItem).count() == 2
assert db.query(GlobalOverlayItemFile).count() == 2
def test_refresh_removes_items_absent_from_manifest(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'remove.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
from l4d2web.services.global_overlays import ensure_global_overlays
from l4d2web.services import global_overlay_refresh
with session_scope() as db:
ensure_global_overlays(db)
source = db.query(GlobalOverlaySource).filter_by(source_key="l4d2center-maps").one()
item = GlobalOverlayItem(source_id=source.id, item_key="old.vpk", display_name="old.vpk", download_url="https://example.invalid/old.7z")
db.add(item)
db.flush()
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name="old.vpk", cache_path="l4d2center-maps/vpks/old.vpk", size=3))
monkeypatch.setattr(global_overlay_refresh, "fetch_l4d2center_manifest", lambda: ("empty-center", []))
monkeypatch.setattr(global_overlay_refresh, "fetch_cedapug_manifest", lambda: ("empty-ceda", []))
monkeypatch.setattr(global_overlay_refresh, "build_global_overlay", lambda overlay, **kwargs: None)
global_overlay_refresh.refresh_global_overlays(on_stdout=lambda line: None, on_stderr=lambda line: None, should_cancel=lambda: False)
with session_scope() as db:
assert db.query(GlobalOverlayItem).filter_by(item_key="old.vpk").count() == 0

View file

@ -0,0 +1,167 @@
from sqlalchemy import select
from l4d2web.db import init_db, session_scope
from l4d2web.models import GlobalOverlaySource, Job, Overlay, User
from l4d2web.services.global_overlays import (
enqueue_refresh_global_overlays,
ensure_global_overlays,
is_creatable_overlay_type,
)
def test_ensure_global_overlays_creates_singletons_and_directories(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlays.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
with session_scope() as session:
created = ensure_global_overlays(session)
assert created == {"cedapug-maps", "l4d2center-maps"}
second = ensure_global_overlays(session)
assert second == set()
overlays = session.scalars(select(Overlay).order_by(Overlay.name)).all()
assert [overlay.name for overlay in overlays] == ["cedapug-maps", "l4d2center-maps"]
assert [overlay.type for overlay in overlays] == ["cedapug_maps", "l4d2center_maps"]
assert [overlay.user_id for overlay in overlays] == [None, None]
assert len({overlay.path for overlay in overlays}) == 2
for overlay in overlays:
assert (tmp_path / "overlays" / overlay.path).is_dir()
sources = session.scalars(select(GlobalOverlaySource).order_by(GlobalOverlaySource.source_key)).all()
assert [source.source_key for source in sources] == ["cedapug-maps", "l4d2center-maps"]
assert [source.source_type for source in sources] == [
"cedapug_custom_page",
"l4d2center_csv",
]
def test_ensure_global_overlays_repairs_existing_rows(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlay_repair.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
with session_scope() as session:
overlay = Overlay(name="cedapug-maps", path="legacy", type="external", user_id=None)
session.add(overlay)
session.flush()
session.add(
GlobalOverlaySource(
overlay_id=overlay.id,
source_key="cedapug-maps",
source_type="wrong",
source_url="https://example.invalid/wrong",
)
)
(tmp_path / "overlays" / "legacy").mkdir(parents=True)
with session_scope() as session:
created = ensure_global_overlays(session)
assert created == {"l4d2center-maps"}
repaired = session.scalar(select(Overlay).where(Overlay.name == "cedapug-maps"))
assert repaired is not None
assert repaired.type == "cedapug_maps"
assert repaired.user_id is None
assert (tmp_path / "overlays" / repaired.path).is_dir()
source = session.scalar(
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == "cedapug-maps")
)
assert source is not None
assert source.source_type == "cedapug_custom_page"
assert source.source_url == "https://cedapug.com/custom"
def test_ensure_global_overlays_does_not_hijack_private_overlay_name(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'global_overlay_private_name.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
with session_scope() as session:
user = User(username="alice", password_digest="digest", admin=False)
session.add(user)
session.flush()
private = Overlay(
name="l4d2center-maps",
path="private-l4d2center",
type="workshop",
user_id=user.id,
)
session.add(private)
session.flush()
private_id = private.id
private_user_id = user.id
with session_scope() as session:
created = ensure_global_overlays(session)
assert created == {"cedapug-maps", "l4d2center-maps"}
private = session.scalar(select(Overlay).where(Overlay.id == private_id))
assert private is not None
assert private.user_id == private_user_id
assert private.type == "workshop"
assert private.path == "private-l4d2center"
system = session.scalar(
select(Overlay).where(Overlay.name == "l4d2center-maps", Overlay.user_id.is_(None))
)
assert system is not None
assert system.id != private_id
assert system.type == "l4d2center_maps"
assert (tmp_path / "overlays" / system.path).is_dir()
source = session.scalar(
select(GlobalOverlaySource).where(GlobalOverlaySource.source_key == "l4d2center-maps")
)
assert source is not None
assert source.overlay_id == system.id
def test_enqueue_refresh_global_overlays_coalesces_active_jobs(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh_jobs.db'}")
init_db()
for state in ("queued", "running", "cancelling"):
with session_scope() as session:
session.query(Job).delete()
existing = Job(
user_id=7,
server_id=None,
overlay_id=None,
operation="refresh_global_overlays",
state=state,
)
session.add(existing)
session.flush()
existing_id = existing.id
job = enqueue_refresh_global_overlays(session, user_id=None)
assert job.id == existing_id
assert session.query(Job).filter_by(operation="refresh_global_overlays").count() == 1
def test_enqueue_refresh_global_overlays_creates_system_job(tmp_path, monkeypatch):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'refresh_system_job.db'}")
init_db()
with session_scope() as session:
job = enqueue_refresh_global_overlays(session, user_id=None)
assert job.id is not None
assert job.user_id is None
assert job.server_id is None
assert job.overlay_id is None
assert job.operation == "refresh_global_overlays"
assert job.state == "queued"
def test_is_creatable_overlay_type_policy():
assert is_creatable_overlay_type("workshop", admin=False) is True
assert is_creatable_overlay_type("external", admin=False) is False
assert is_creatable_overlay_type("external", admin=True) is True
assert is_creatable_overlay_type("workshop", admin=True) is True
assert is_creatable_overlay_type("l4d2center_maps", admin=True) is False
assert is_creatable_overlay_type("cedapug_maps", admin=True) is False

View file

@ -104,3 +104,28 @@ def test_sse_js_handles_job_log_custom_events() -> None:
assert 'addEventListener("stdout"' in js
assert 'addEventListener("stderr"' in js
def test_system_job_logs_persist(tmp_path, monkeypatch):
from l4d2web.models import Job, JobLog
from l4d2web.services.job_worker import append_job_log
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'system-job-logs.db'}")
init_db()
with session_scope() as db:
job = Job(
user_id=None,
server_id=None,
operation="refresh_global_overlays",
state="queued",
)
db.add(job)
db.flush()
seq = append_job_log(db, job.id, "stdout", "queued by system timer")
db.flush()
row = db.query(JobLog).filter_by(job_id=job.id).one()
assert seq == 1
assert row.line == "queued by system timer"

View file

@ -700,3 +700,48 @@ def test_refresh_job_enqueues_build_overlay_without_locking_its_final_log(
assert job.state == "succeeded"
assert build_job is not None
assert "enqueued build_overlay for 1 overlay(s)" in lines
def test_refresh_global_overlays_blocks_install_build_refresh_and_servers() -> None:
from l4d2web.services.job_worker import SchedulerState, can_start
state = SchedulerState(refresh_global_overlays_running=True)
assert can_start(DummyJob(operation="install"), state) is False
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
assert can_start(DummyJob(operation="start", server_id=1), state) is False
def test_refresh_global_overlays_waits_for_active_work() -> None:
from l4d2web.services.job_worker import SchedulerState, can_start
assert can_start(DummyJob(operation="refresh_global_overlays"), SchedulerState(install_running=True)) is False
assert can_start(DummyJob(operation="refresh_global_overlays"), SchedulerState(refresh_running=True)) is False
state = SchedulerState()
state.running_overlays.add(1)
assert can_start(DummyJob(operation="refresh_global_overlays"), state) is False
state = SchedulerState()
state.running_servers.add(1)
assert can_start(DummyJob(operation="refresh_global_overlays"), state) is False
def test_run_worker_once_dispatches_refresh_global_overlays(seeded_worker, monkeypatch):
from l4d2web.services import job_worker
from l4d2web.models import Job
from l4d2web.db import session_scope
called = []
def fake_refresh(*, on_stdout, on_stderr, should_cancel):
called.append("refresh")
on_stdout("global refresh complete")
return ["l4d2center-maps"]
monkeypatch.setattr(job_worker, "_run_refresh_global_overlays", fake_refresh)
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_global_overlays", state="queued")
db.add(job)
app, ids = seeded_worker
assert job_worker.run_worker_once() is True
assert called == ["refresh"]

View file

@ -258,3 +258,55 @@ def test_initialize_fails_fast_on_uncached_workshop_items(
assert str(overlay_id) in msg or "ws" in msg
# l4d2ctl initialize MUST NOT run when uncached items are present.
assert all("initialize" not in cmd for cmd in invocations), invocations
def test_initialize_fails_when_global_overlay_cache_file_missing(tmp_path, monkeypatch):
from l4d2web.db import init_db, session_scope
from l4d2web.models import (
Blueprint,
BlueprintOverlay,
GlobalOverlayItem,
GlobalOverlayItemFile,
GlobalOverlaySource,
Overlay,
Server,
User,
)
from l4d2web.services.l4d2_facade import initialize_server
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path/'facade-global.db'}")
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
init_db()
with session_scope() as db:
user = User(username="alice", password_digest="digest")
db.add(user)
db.flush()
overlay = Overlay(name="l4d2center-maps", path="7", type="l4d2center_maps", user_id=None)
db.add(overlay)
db.flush()
source = GlobalOverlaySource(overlay_id=overlay.id, source_key="l4d2center-maps", source_type="l4d2center_csv", source_url="https://l4d2center.com/maps/servers/index.csv")
db.add(source)
db.flush()
item = GlobalOverlayItem(source_id=source.id, item_key="carriedoff.vpk", display_name="carriedoff.vpk", download_url="https://example.invalid/carriedoff.7z")
db.add(item)
db.flush()
db.add(GlobalOverlayItemFile(item_id=item.id, vpk_name="carriedoff.vpk", cache_path="l4d2center-maps/vpks/carriedoff.vpk", size=123))
blueprint = Blueprint(user_id=user.id, name="bp", arguments="[]", config="[]")
db.add(blueprint)
db.flush()
db.add(BlueprintOverlay(blueprint_id=blueprint.id, overlay_id=overlay.id, position=0))
server = Server(user_id=user.id, blueprint_id=blueprint.id, name="alpha", port=27015)
db.add(server)
db.flush()
server_id = server.id
monkeypatch.setattr("l4d2web.services.host_commands.run_command", lambda *args, **kwargs: None)
try:
initialize_server(server_id)
except RuntimeError as exc:
assert "carriedoff.vpk" in str(exc)
assert "l4d2center-maps" in str(exc)
else:
raise AssertionError("missing global overlay cache file must fail")

View file

@ -2,7 +2,7 @@ import pytest
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Blueprint, BlueprintOverlay, Overlay, User
from l4d2web.models import Blueprint, BlueprintOverlay, GlobalOverlaySource, Overlay, User
from l4d2web.services.security import validate_overlay_ref
@ -60,6 +60,16 @@ def test_user_can_view_overlay_catalog(user_client_with_overlay) -> None:
assert "Create overlay" in text
def test_non_admin_can_view_managed_global_system_overlay(user_client_with_overlay) -> None:
_create_managed_global_overlay()
response = user_client_with_overlay.get("/overlays")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert "l4d2center-maps" in text
def test_admin_can_view_overlay_edit_controls(admin_client) -> None:
response = admin_client.get("/overlays")
text = response.get_data(as_text=True)
@ -80,6 +90,16 @@ def test_admin_can_create_external_overlay(admin_client) -> None:
assert response.headers["Location"].startswith("/overlays/")
def test_admin_cannot_create_managed_global_overlay_type(admin_client) -> None:
response = admin_client.post(
"/overlays",
data={"name": "managed", "type": "l4d2center_maps"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 400
assert "unknown overlay type" in response.get_data(as_text=True)
@pytest.mark.parametrize("overlay_ref", [" standard", "standard ", "a//b", "a/", "./a", "a/.", "."])
def test_overlay_ref_rejects_unsafe_components(overlay_ref: str) -> None:
with pytest.raises(ValueError):
@ -92,7 +112,7 @@ def test_non_admin_cannot_create_external_overlay(user_client_with_overlay) -> N
data={"name": "bad", "type": "external"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
assert response.status_code == 400
def test_user_can_create_workshop_overlay(user_client_with_overlay) -> None:
@ -184,6 +204,62 @@ def test_admin_can_update_and_delete_overlay(admin_client) -> None:
assert delete.status_code == 302
def _create_managed_global_overlay() -> int:
with session_scope() as session:
overlay = Overlay(
name="l4d2center-maps",
path="managed-l4d2center",
type="l4d2center_maps",
user_id=None,
)
session.add(overlay)
session.flush()
session.add(
GlobalOverlaySource(
overlay_id=overlay.id,
source_key="l4d2center-maps",
source_type="l4d2center_csv",
source_url="https://l4d2center.com/maps/servers/index.csv",
)
)
return overlay.id
def test_admin_cannot_update_managed_global_overlay(admin_client) -> None:
overlay_id = _create_managed_global_overlay()
response = admin_client.post(
f"/overlays/{overlay_id}",
data={"name": "renamed"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
def test_admin_cannot_delete_managed_global_overlay(admin_client) -> None:
overlay_id = _create_managed_global_overlay()
response = admin_client.post(
f"/overlays/{overlay_id}/delete",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 403
def test_admin_overlay_detail_hides_edit_for_managed_global_overlay(admin_client) -> None:
overlay_id = _create_managed_global_overlay()
response = admin_client.get(f"/overlays/{overlay_id}")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert f'action="/overlays/{overlay_id}"' not in text
assert "delete-overlay-modal" not in text
def test_update_overlay_rejects_duplicate_name(admin_client) -> None:
ids: list[int] = []
for name in ("standard", "competitive"):
@ -235,6 +311,56 @@ def test_overlay_detail_page_lists_using_blueprints(admin_client) -> None:
assert "Used by" in text
def test_non_admin_overlay_detail_only_lists_own_using_blueprints(user_client_with_overlay) -> None:
overlay_id = _create_managed_global_overlay()
with session_scope() as session:
alice = session.query(User).filter_by(username="alice").one()
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
session.add(other)
session.flush()
own_bp = Blueprint(user_id=alice.id, name="own-bp", arguments="[]", config="[]")
other_bp = Blueprint(user_id=other.id, name="other-private-bp", arguments="[]", config="[]")
session.add_all([own_bp, other_bp])
session.flush()
session.add(BlueprintOverlay(blueprint_id=own_bp.id, overlay_id=overlay_id, position=0))
session.add(BlueprintOverlay(blueprint_id=other_bp.id, overlay_id=overlay_id, position=0))
response = user_client_with_overlay.get(f"/overlays/{overlay_id}")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert "own-bp" in text
assert "other-private-bp" not in text
def test_blueprint_edit_lists_system_and_owned_overlays_only(user_client_with_overlay) -> None:
system_overlay_id = _create_managed_global_overlay()
with session_scope() as session:
alice = session.query(User).filter_by(username="alice").one()
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
session.add(other)
session.flush()
foreign_overlay = Overlay(
name="other-private-workshop",
path="other-private-workshop",
type="workshop",
user_id=other.id,
)
blueprint = Blueprint(user_id=alice.id, name="alice-bp", arguments="[]", config="[]")
session.add_all([foreign_overlay, blueprint])
session.flush()
blueprint_id = blueprint.id
response = user_client_with_overlay.get(f"/blueprints/{blueprint_id}")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert "l4d2center-maps" in text
assert f'value="{system_overlay_id}"' in text
assert "other-private-workshop" not in text
def test_overlay_detail_page_404_when_missing(admin_client) -> None:
response = admin_client.get("/overlays/999")
assert response.status_code == 404
@ -251,6 +377,36 @@ def test_overlay_detail_hides_edit_for_non_admin_external(user_client_with_overl
assert "delete-overlay-modal" not in text
def test_non_admin_cannot_view_other_users_private_non_workshop_overlay(user_client_with_overlay) -> None:
with session_scope() as session:
other = User(username="mallory", password_digest=hash_password("secret"), admin=False)
session.add(other)
session.flush()
overlay = Overlay(
name="private-external",
path="private-external",
type="external",
user_id=other.id,
)
session.add(overlay)
session.flush()
overlay_id = overlay.id
response = user_client_with_overlay.get(f"/overlays/{overlay_id}")
assert response.status_code == 403
def test_managed_global_overlay_detail_shows_source_url(admin_client) -> None:
overlay_id = _create_managed_global_overlay()
response = admin_client.get(f"/overlays/{overlay_id}")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert "https://l4d2center.com/maps/servers/index.csv" in text
def test_overlay_update_redirects_to_detail(admin_client) -> None:
create = admin_client.post(
"/overlays",
@ -293,3 +449,9 @@ def test_delete_overlay_rejects_in_use_overlay(admin_client) -> None:
)
assert response.status_code == 409
def test_admin_can_enqueue_refresh_global_overlays(admin_client):
response = admin_client.post("/admin/global-overlays/refresh", headers={"X-CSRF-Token": "test-token"})
assert response.status_code == 302
assert response.headers["Location"] == "/admin/jobs"

View file

@ -457,3 +457,56 @@ def test_blueprint_detail_has_ordered_overlay_form(auth_client_with_server) -> N
assert 'name="config"' in text
assert 'name="overlay_ids"' in text
assert 'name="overlay_position_1"' in text
def test_admin_jobs_page_renders_system_job(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'admin-system-job.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:
admin = User(username="admin", password_digest=hash_password("secret"), admin=True)
session.add(admin)
session.flush()
admin_id = admin.id
admin_client = app.test_client()
with admin_client.session_transaction() as sess:
sess["user_id"] = admin_id
with session_scope() as db:
db.add(Job(user_id=None, server_id=None, operation="refresh_global_overlays", state="queued"))
response = admin_client.get("/admin/jobs")
text = response.get_data(as_text=True)
assert response.status_code == 200
assert "refresh_global_overlays" in text
assert "system" in text
def test_non_admin_cannot_view_system_job(tmp_path, monkeypatch) -> None:
db_url = f"sqlite:///{tmp_path/'non-admin-system-job.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
with session_scope() as session:
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
session.add(user)
session.flush()
user_id = user.id
user_client = app.test_client()
with user_client.session_transaction() as sess:
sess["user_id"] = user_id
with session_scope() as db:
job = Job(user_id=None, server_id=None, operation="refresh_global_overlays", state="queued")
db.add(job)
db.flush()
job_id = job.id
response = user_client.get(f"/jobs/{job_id}")
assert response.status_code == 403