Extends SchedulerState with running_overlays / refresh_running / blocked_servers_by_overlay, and updates can_start with the truth table: install and refresh_workshop_items are global mutexes; build_overlay serializes per-overlay; server jobs block on builds for any overlay their blueprint references. Adds enqueue_build_overlay coalescing helper that returns an existing queued job for the same overlay rather than inserting a duplicate. Adds run_job dispatch for build_overlay (BUILDERS[overlay.type].build) and refresh_workshop_items (re-fetches metadata, re-downloads on time_updated/filename change, enqueues coalesced rebuilds for affected overlays). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
589 lines
21 KiB
Python
589 lines
21 KiB
Python
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from types import SimpleNamespace
|
|
import subprocess
|
|
|
|
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import (
|
|
Blueprint,
|
|
BlueprintOverlay,
|
|
Job,
|
|
Overlay,
|
|
OverlayWorkshopItem,
|
|
Server,
|
|
User,
|
|
WorkshopItem,
|
|
)
|
|
from l4d2web.services import l4d2_facade
|
|
from l4d2web.services.host_commands import CommandCancelledError
|
|
from l4d2web.services.job_worker import (
|
|
SchedulerState,
|
|
build_scheduler_state,
|
|
can_start,
|
|
enqueue_build_overlay,
|
|
recover_stale_jobs,
|
|
run_worker_once,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class DummyJob:
|
|
operation: str
|
|
server_id: int | None = None
|
|
overlay_id: int | None = None
|
|
|
|
|
|
@pytest.fixture
|
|
def worker_app(tmp_path, monkeypatch):
|
|
from l4d2web.app import create_app
|
|
|
|
db_url = f"sqlite:///{tmp_path/'worker.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def seeded_worker(worker_app):
|
|
with session_scope() as session:
|
|
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
|
|
blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]")
|
|
session.add(blueprint)
|
|
session.flush()
|
|
|
|
server_one = Server(
|
|
user_id=user.id,
|
|
blueprint_id=blueprint.id,
|
|
name="alpha",
|
|
port=27015,
|
|
last_error="old error",
|
|
)
|
|
server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016)
|
|
session.add_all([server_one, server_two])
|
|
session.flush()
|
|
|
|
ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id)
|
|
|
|
return worker_app, ids
|
|
|
|
|
|
def add_job(
|
|
user_id: int,
|
|
operation: str,
|
|
*,
|
|
server_id: int | None,
|
|
state: str = "queued",
|
|
created_at: datetime | None = None,
|
|
overlay_id: int | None = None,
|
|
) -> int:
|
|
now = datetime.now(UTC)
|
|
with session_scope() as session:
|
|
job = Job(
|
|
user_id=user_id,
|
|
server_id=server_id,
|
|
overlay_id=overlay_id,
|
|
operation=operation,
|
|
state=state,
|
|
created_at=created_at or now,
|
|
updated_at=created_at or now,
|
|
)
|
|
if state == "running":
|
|
job.started_at = now
|
|
session.add(job)
|
|
session.flush()
|
|
return job.id
|
|
|
|
|
|
def load_job(job_id: int) -> Job:
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
assert job is not None
|
|
return job
|
|
|
|
|
|
def test_scheduler_predicates() -> None:
|
|
state = SchedulerState()
|
|
state.running_servers.add(1)
|
|
|
|
assert can_start(DummyJob(operation="start", server_id=1), state) is False
|
|
assert can_start(DummyJob(operation="start", server_id=2), state) is True
|
|
assert can_start(DummyJob(operation="install", server_id=None), state) is False
|
|
|
|
|
|
def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
calls = []
|
|
older = datetime.now(UTC) - timedelta(minutes=2)
|
|
newer = datetime.now(UTC)
|
|
old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older)
|
|
new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer)
|
|
|
|
monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id))
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert calls == [ids.server_one]
|
|
assert load_job(old_job_id).state == "succeeded"
|
|
assert load_job(new_job_id).state == "queued"
|
|
|
|
|
|
def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "start", server_id=ids.server_one)
|
|
calls = []
|
|
|
|
def fake_initialize(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
del should_cancel
|
|
calls.append(("initialize", server_id))
|
|
on_stdout("Step: creating instance directories...")
|
|
on_stderr("init warning")
|
|
|
|
def fake_start(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
del should_cancel
|
|
calls.append(("start", server_id))
|
|
on_stdout("Step: mounting runtime overlay...")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize)
|
|
monkeypatch.setattr(l4d2_facade, "start_server", fake_start)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)]
|
|
|
|
assert calls == [("initialize", ids.server_one), ("start", ids.server_one)]
|
|
assert job is not None
|
|
assert job.state == "succeeded"
|
|
assert job.exit_code == 0
|
|
assert job.started_at is not None
|
|
assert job.finished_at is not None
|
|
assert job.updated_at is not None
|
|
assert lines == [
|
|
(1, "stdout", "starting initialize for alpha"),
|
|
(2, "stdout", "Step: creating instance directories..."),
|
|
(3, "stderr", "init warning"),
|
|
(4, "stdout", "finished initialize successfully"),
|
|
(5, "stdout", "starting start for alpha"),
|
|
(6, "stdout", "Step: mounting runtime overlay..."),
|
|
(7, "stdout", "finished start successfully"),
|
|
]
|
|
assert server is not None
|
|
assert server.actual_state == "running"
|
|
assert server.last_error == ""
|
|
|
|
|
|
def job_logs_for(session, job_id: int):
|
|
from l4d2web.models import JobLog
|
|
|
|
return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all()
|
|
|
|
|
|
def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fail_stop(server_id, **kwargs):
|
|
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 7
|
|
assert server is not None
|
|
assert server.last_error == "stop failed"
|
|
assert "starting stop for alpha" in lines
|
|
assert "stop failed" in lines
|
|
|
|
|
|
def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fail_stop(server_id, **kwargs):
|
|
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down")))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 7
|
|
assert server is not None
|
|
assert server.last_error == "stop failed"
|
|
assert "starting stop for alpha" in lines
|
|
assert "stop failed" in lines
|
|
assert "status refresh failed: status down" in lines
|
|
|
|
|
|
def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "delete", server_id=ids.server_one)
|
|
|
|
monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
job = load_job(job_id)
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 1
|
|
|
|
|
|
def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_same_server_jobs_wait_while_job_is_cancelling(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="cancelling")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_cancelled_process_finishes_job_as_cancelled(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fake_stop(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
assert server_id == ids.server_one
|
|
assert should_cancel is not None
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
assert job is not None
|
|
job.state = "cancelling"
|
|
assert should_cancel() is True
|
|
on_stderr("terminating")
|
|
raise CommandCancelledError(returncode=-15, cmd=["stop"], output="", stderr="")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fake_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "cancelled"
|
|
assert job.exit_code == -15
|
|
assert job.finished_at is not None
|
|
assert server is not None
|
|
assert server.last_error == "job cancelled; runtime state may be partial"
|
|
assert "starting stop for alpha" in lines
|
|
assert "terminating" in lines
|
|
assert "job cancelled; runtime state may be partial" in lines
|
|
|
|
|
|
def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_two)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert load_job(queued_id).state == "succeeded"
|
|
|
|
|
|
def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
install_id = add_job(ids.user, "install", server_id=None)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(install_id).state == "queued"
|
|
|
|
|
|
def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "install", server_id=None, state="running")
|
|
queued_id = add_job(ids.user, "initialize", server_id=ids.server_one)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_recover_stale_running_jobs(worker_app) -> None:
|
|
with session_scope() as session:
|
|
user = User(username="bob", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
job = Job(user_id=user.id, server_id=None, operation="install", state="running")
|
|
session.add(job)
|
|
session.flush()
|
|
job_id = job.id
|
|
|
|
with worker_app.app_context():
|
|
assert recover_stale_jobs() == 1
|
|
|
|
recovered = load_job(job_id)
|
|
assert recovered.state == "failed"
|
|
assert recovered.finished_at is not None
|
|
|
|
|
|
def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None:
|
|
from l4d2web import app as app_module
|
|
|
|
called = []
|
|
db_url = f"sqlite:///{tmp_path/'startup.db'}"
|
|
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
|
|
|
|
app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
|
|
assert called == []
|
|
|
|
|
|
def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None:
|
|
from l4d2web import app as app_module
|
|
|
|
called = []
|
|
db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}"
|
|
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
|
|
|
|
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
|
|
assert called == [app]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduler truth table for the new operations (build_overlay,
|
|
# refresh_workshop_items) and their interaction with existing ops.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_install_blocks_build_overlay_and_refresh() -> None:
|
|
state = SchedulerState(install_running=True)
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
|
|
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
|
|
|
|
|
|
def test_refresh_blocks_install_build_overlay_and_servers() -> None:
|
|
state = SchedulerState(refresh_running=True)
|
|
assert can_start(DummyJob(operation="install"), state) is False
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
|
|
assert can_start(DummyJob(operation="start", server_id=1), state) is False
|
|
|
|
|
|
def test_build_overlay_blocks_same_overlay_only() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=7), state) is False
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=8), state) is True
|
|
|
|
|
|
def test_install_blocked_by_active_build_overlay() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="install"), state) is False
|
|
|
|
|
|
def test_refresh_blocked_by_active_build_overlay() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
|
|
|
|
|
|
def test_server_job_blocked_when_blueprint_overlay_is_building() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
state.blocked_servers_by_overlay.add(42)
|
|
assert can_start(DummyJob(operation="start", server_id=42), state) is False
|
|
# Other servers (whose blueprints don't reference overlay 7) are NOT blocked.
|
|
assert can_start(DummyJob(operation="start", server_id=43), state) is True
|
|
|
|
|
|
@pytest.fixture
|
|
def overlay_seeded_worker(seeded_worker):
|
|
app, ids = seeded_worker
|
|
with session_scope() as s:
|
|
overlay = Overlay(name="ws", path="9", type="workshop", user_id=ids.user)
|
|
s.add(overlay)
|
|
s.flush()
|
|
# Move server_two onto a different blueprint with NO workshop overlay,
|
|
# so the test can distinguish "blocked by overlay build" from "any
|
|
# server is blocked".
|
|
bp_with_overlay = s.scalar(select(Blueprint).where(Blueprint.user_id == ids.user))
|
|
s.add(BlueprintOverlay(blueprint_id=bp_with_overlay.id, overlay_id=overlay.id, position=0))
|
|
bp_without = Blueprint(user_id=ids.user, name="no-overlay", arguments="[]", config="[]")
|
|
s.add(bp_without)
|
|
s.flush()
|
|
server_two = s.scalar(select(Server).where(Server.id == ids.server_two))
|
|
server_two.blueprint_id = bp_without.id
|
|
ids.overlay = overlay.id
|
|
return app, ids
|
|
|
|
|
|
def test_scheduler_state_finds_servers_blocked_by_running_build(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
|
|
|
|
with session_scope() as s:
|
|
state = build_scheduler_state(s)
|
|
|
|
assert ids.overlay in state.running_overlays
|
|
assert ids.server_one in state.blocked_servers_by_overlay
|
|
assert ids.server_two not in state.blocked_servers_by_overlay
|
|
|
|
|
|
def test_enqueue_build_overlay_creates_new_job_when_none_pending(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
|
|
with session_scope() as s:
|
|
job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert job.operation == "build_overlay"
|
|
assert job.overlay_id == ids.overlay
|
|
assert job.server_id is None
|
|
assert job.state == "queued"
|
|
|
|
|
|
def test_enqueue_build_overlay_coalesces_against_pending(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
|
|
with session_scope() as s:
|
|
first = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
first_id = first.id
|
|
|
|
with session_scope() as s:
|
|
second = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert second.id == first_id, "should coalesce against the pending job"
|
|
|
|
with session_scope() as s:
|
|
n = s.query(Job).filter_by(operation="build_overlay", overlay_id=ids.overlay).count()
|
|
assert n == 1
|
|
|
|
|
|
def test_enqueue_build_overlay_does_not_coalesce_against_running(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
|
|
|
|
with session_scope() as s:
|
|
new_job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert new_job.state == "queued"
|
|
|
|
with session_scope() as s:
|
|
running = s.scalars(
|
|
select(Job).where(
|
|
Job.operation == "build_overlay",
|
|
Job.overlay_id == ids.overlay,
|
|
Job.state == "running",
|
|
)
|
|
).all()
|
|
queued = s.scalars(
|
|
select(Job).where(
|
|
Job.operation == "build_overlay",
|
|
Job.overlay_id == ids.overlay,
|
|
Job.state == "queued",
|
|
)
|
|
).all()
|
|
assert len(running) == 1
|
|
assert len(queued) == 1
|
|
|
|
|
|
def test_run_worker_once_dispatches_build_overlay(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(steam_id="1001", title="A", filename="a.vpk", file_url="u", file_size=3, time_updated=1)
|
|
s.add(wi)
|
|
s.flush()
|
|
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
|
|
cache = tmp_path / "workshop_cache"
|
|
cache.mkdir()
|
|
(cache / "1001.vpk").write_bytes(b"abc")
|
|
import os
|
|
os.utime(cache / "1001.vpk", (1, 1))
|
|
# Mark item as downloaded.
|
|
with session_scope() as s:
|
|
wi = s.query(WorkshopItem).filter_by(steam_id="1001").one()
|
|
wi.last_downloaded_at = datetime.now(UTC)
|
|
|
|
job_id = add_job(ids.user, "build_overlay", server_id=None, overlay_id=ids.overlay)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
job = load_job(job_id)
|
|
assert job.state == "succeeded", job.exit_code
|
|
addons = tmp_path / "overlays" / "9" / "left4dead2" / "addons"
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
|
|
|
|
def test_run_worker_once_dispatches_refresh(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
from l4d2web.services import steam_workshop, job_worker
|
|
|
|
refresh_calls = []
|
|
|
|
def fake_refresh_workshop_items(*, on_stdout, on_stderr, should_cancel):
|
|
refresh_calls.append(True)
|
|
on_stdout("refresh phase complete (downloaded=0 errors=0)")
|
|
return [] # no overlays affected
|
|
|
|
monkeypatch.setattr(job_worker, "_run_refresh_workshop_items", fake_refresh_workshop_items)
|
|
|
|
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert refresh_calls == [True]
|
|
job = load_job(job_id)
|
|
assert job.state == "succeeded"
|