Limit workshop refresh downloads to one worker and commit build-overlay enqueue work before writing the final job log so SQLite locks do not wedge the web process.
702 lines
25 KiB
Python
702 lines
25 KiB
Python
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from types import SimpleNamespace
|
|
import subprocess
|
|
|
|
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import (
|
|
Blueprint,
|
|
BlueprintOverlay,
|
|
Job,
|
|
Overlay,
|
|
OverlayWorkshopItem,
|
|
Server,
|
|
User,
|
|
WorkshopItem,
|
|
)
|
|
from l4d2web.services import l4d2_facade
|
|
from l4d2web.services.host_commands import CommandCancelledError
|
|
from l4d2web.services.job_worker import (
|
|
SchedulerState,
|
|
build_scheduler_state,
|
|
can_start,
|
|
enqueue_build_overlay,
|
|
recover_stale_jobs,
|
|
run_worker_once,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class DummyJob:
|
|
operation: str
|
|
server_id: int | None = None
|
|
overlay_id: int | None = None
|
|
|
|
|
|
@pytest.fixture
|
|
def worker_app(tmp_path, monkeypatch):
|
|
from l4d2web.app import create_app
|
|
|
|
db_url = f"sqlite:///{tmp_path/'worker.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def seeded_worker(worker_app):
|
|
with session_scope() as session:
|
|
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
|
|
blueprint = Blueprint(user_id=user.id, name="default", arguments="[]", config="[]")
|
|
session.add(blueprint)
|
|
session.flush()
|
|
|
|
server_one = Server(
|
|
user_id=user.id,
|
|
blueprint_id=blueprint.id,
|
|
name="alpha",
|
|
port=27015,
|
|
last_error="old error",
|
|
)
|
|
server_two = Server(user_id=user.id, blueprint_id=blueprint.id, name="bravo", port=27016)
|
|
session.add_all([server_one, server_two])
|
|
session.flush()
|
|
|
|
ids = SimpleNamespace(user=user.id, server_one=server_one.id, server_two=server_two.id)
|
|
|
|
return worker_app, ids
|
|
|
|
|
|
def add_job(
|
|
user_id: int,
|
|
operation: str,
|
|
*,
|
|
server_id: int | None,
|
|
state: str = "queued",
|
|
created_at: datetime | None = None,
|
|
overlay_id: int | None = None,
|
|
) -> int:
|
|
now = datetime.now(UTC)
|
|
with session_scope() as session:
|
|
job = Job(
|
|
user_id=user_id,
|
|
server_id=server_id,
|
|
overlay_id=overlay_id,
|
|
operation=operation,
|
|
state=state,
|
|
created_at=created_at or now,
|
|
updated_at=created_at or now,
|
|
)
|
|
if state == "running":
|
|
job.started_at = now
|
|
session.add(job)
|
|
session.flush()
|
|
return job.id
|
|
|
|
|
|
def load_job(job_id: int) -> Job:
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
assert job is not None
|
|
return job
|
|
|
|
|
|
def test_scheduler_predicates() -> None:
|
|
state = SchedulerState()
|
|
state.running_servers.add(1)
|
|
|
|
assert can_start(DummyJob(operation="start", server_id=1), state) is False
|
|
assert can_start(DummyJob(operation="start", server_id=2), state) is True
|
|
assert can_start(DummyJob(operation="install", server_id=None), state) is False
|
|
|
|
|
|
def test_run_worker_once_claims_oldest_runnable_job(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
calls = []
|
|
older = datetime.now(UTC) - timedelta(minutes=2)
|
|
newer = datetime.now(UTC)
|
|
old_job_id = add_job(ids.user, "initialize", server_id=ids.server_one, created_at=older)
|
|
new_job_id = add_job(ids.user, "initialize", server_id=ids.server_two, created_at=newer)
|
|
|
|
monkeypatch.setattr(l4d2_facade, "initialize_server", lambda server_id, **kwargs: calls.append(server_id))
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert calls == [ids.server_one]
|
|
assert load_job(old_job_id).state == "succeeded"
|
|
assert load_job(new_job_id).state == "queued"
|
|
|
|
|
|
def test_successful_start_job_logs_and_refreshes_server_state(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "start", server_id=ids.server_one)
|
|
calls = []
|
|
|
|
def fake_initialize(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
del should_cancel
|
|
calls.append(("initialize", server_id))
|
|
on_stdout("Step: creating instance directories...")
|
|
on_stderr("init warning")
|
|
|
|
def fake_start(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
del should_cancel
|
|
calls.append(("start", server_id))
|
|
on_stdout("Step: mounting runtime overlay...")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "initialize_server", fake_initialize)
|
|
monkeypatch.setattr(l4d2_facade, "start_server", fake_start)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="running"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [(row.seq, row.stream, row.line) for row in job_logs_for(session, job_id)]
|
|
|
|
assert calls == [("initialize", ids.server_one), ("start", ids.server_one)]
|
|
assert job is not None
|
|
assert job.state == "succeeded"
|
|
assert job.exit_code == 0
|
|
assert job.started_at is not None
|
|
assert job.finished_at is not None
|
|
assert job.updated_at is not None
|
|
assert lines == [
|
|
(1, "stdout", "starting initialize for alpha"),
|
|
(2, "stdout", "Step: creating instance directories..."),
|
|
(3, "stderr", "init warning"),
|
|
(4, "stdout", "finished initialize successfully"),
|
|
(5, "stdout", "starting start for alpha"),
|
|
(6, "stdout", "Step: mounting runtime overlay..."),
|
|
(7, "stdout", "finished start successfully"),
|
|
]
|
|
assert server is not None
|
|
assert server.actual_state == "running"
|
|
assert server.last_error == ""
|
|
|
|
|
|
def job_logs_for(session, job_id: int):
|
|
from l4d2web.models import JobLog
|
|
|
|
return session.scalars(select(JobLog).where(JobLog.job_id == job_id).order_by(JobLog.seq)).all()
|
|
|
|
|
|
def test_called_process_error_fails_job_and_sets_server_error(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fail_stop(server_id, **kwargs):
|
|
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 7
|
|
assert server is not None
|
|
assert server.last_error == "stop failed"
|
|
assert "starting stop for alpha" in lines
|
|
assert "stop failed" in lines
|
|
|
|
|
|
def test_refresh_failure_does_not_hide_operation_failure(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fail_stop(server_id, **kwargs):
|
|
raise subprocess.CalledProcessError(returncode=7, cmd=["stop"], stderr="stop failed")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fail_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: (_ for _ in ()).throw(RuntimeError("status down")))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 7
|
|
assert server is not None
|
|
assert server.last_error == "stop failed"
|
|
assert "starting stop for alpha" in lines
|
|
assert "stop failed" in lines
|
|
assert "status refresh failed: status down" in lines
|
|
|
|
|
|
def test_unexpected_exception_fails_job_with_exit_code_one(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "delete", server_id=ids.server_one)
|
|
|
|
monkeypatch.setattr(l4d2_facade, "delete_server", lambda server_id, **kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
job = load_job(job_id)
|
|
assert job.state == "failed"
|
|
assert job.exit_code == 1
|
|
|
|
|
|
def test_same_server_jobs_do_not_overlap(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_same_server_jobs_wait_while_job_is_cancelling(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="cancelling")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: pytest.fail("must not run"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_cancelled_process_finishes_job_as_cancelled(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
job_id = add_job(ids.user, "stop", server_id=ids.server_one)
|
|
|
|
def fake_stop(server_id, *, on_stdout=None, on_stderr=None, should_cancel=None):
|
|
assert server_id == ids.server_one
|
|
assert should_cancel is not None
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
assert job is not None
|
|
job.state = "cancelling"
|
|
assert should_cancel() is True
|
|
on_stderr("terminating")
|
|
raise CommandCancelledError(returncode=-15, cmd=["stop"], output="", stderr="")
|
|
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", fake_stop)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="unknown"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as session:
|
|
job = session.scalar(select(Job).where(Job.id == job_id))
|
|
server = session.scalar(select(Server).where(Server.id == ids.server_one))
|
|
lines = [row.line for row in job_logs_for(session, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "cancelled"
|
|
assert job.exit_code == -15
|
|
assert job.finished_at is not None
|
|
assert server is not None
|
|
assert server.last_error == "job cancelled; runtime state may be partial"
|
|
assert "starting stop for alpha" in lines
|
|
assert "terminating" in lines
|
|
assert "job cancelled; runtime state may be partial" in lines
|
|
|
|
|
|
def test_different_server_jobs_can_be_claimed_while_other_server_runs(seeded_worker, monkeypatch) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
queued_id = add_job(ids.user, "stop", server_id=ids.server_two)
|
|
monkeypatch.setattr(l4d2_facade, "stop_server", lambda server_id, **kwargs: None)
|
|
monkeypatch.setattr(l4d2_facade, "server_status", lambda name: SimpleNamespace(state="stopped"))
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert load_job(queued_id).state == "succeeded"
|
|
|
|
|
|
def test_install_job_not_claimed_while_server_job_runs(seeded_worker) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "start", server_id=ids.server_one, state="running")
|
|
install_id = add_job(ids.user, "install", server_id=None)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(install_id).state == "queued"
|
|
|
|
|
|
def test_server_job_not_claimed_while_install_runs(seeded_worker) -> None:
|
|
app, ids = seeded_worker
|
|
add_job(ids.user, "install", server_id=None, state="running")
|
|
queued_id = add_job(ids.user, "initialize", server_id=ids.server_one)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is False
|
|
|
|
assert load_job(queued_id).state == "queued"
|
|
|
|
|
|
def test_recover_stale_running_jobs(worker_app) -> None:
|
|
with session_scope() as session:
|
|
user = User(username="bob", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
job = Job(user_id=user.id, server_id=None, operation="install", state="running")
|
|
session.add(job)
|
|
session.flush()
|
|
job_id = job.id
|
|
|
|
with worker_app.app_context():
|
|
assert recover_stale_jobs() == 1
|
|
|
|
recovered = load_job(job_id)
|
|
assert recovered.state == "failed"
|
|
assert recovered.finished_at is not None
|
|
|
|
|
|
def test_worker_startup_skipped_during_testing(monkeypatch, tmp_path) -> None:
|
|
from l4d2web import app as app_module
|
|
|
|
called = []
|
|
db_url = f"sqlite:///{tmp_path/'startup.db'}"
|
|
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
|
|
|
|
app_module.create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
|
|
assert called == []
|
|
|
|
|
|
def test_worker_startup_when_enabled_outside_testing(monkeypatch, tmp_path) -> None:
|
|
from l4d2web import app as app_module
|
|
|
|
called = []
|
|
db_url = f"sqlite:///{tmp_path/'startup-enabled.db'}"
|
|
monkeypatch.setattr(app_module, "start_job_workers", lambda app: called.append(app))
|
|
|
|
app = app_module.create_app({"TESTING": False, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
|
|
assert called == [app]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scheduler truth table for the new operations (build_overlay,
|
|
# refresh_workshop_items) and their interaction with existing ops.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_install_blocks_build_overlay_and_refresh() -> None:
|
|
state = SchedulerState(install_running=True)
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
|
|
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
|
|
|
|
|
|
def test_refresh_blocks_install_build_overlay_and_servers() -> None:
|
|
state = SchedulerState(refresh_running=True)
|
|
assert can_start(DummyJob(operation="install"), state) is False
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=1), state) is False
|
|
assert can_start(DummyJob(operation="start", server_id=1), state) is False
|
|
|
|
|
|
def test_build_overlay_blocks_same_overlay_only() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=7), state) is False
|
|
assert can_start(DummyJob(operation="build_overlay", overlay_id=8), state) is True
|
|
|
|
|
|
def test_install_blocked_by_active_build_overlay() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="install"), state) is False
|
|
|
|
|
|
def test_refresh_blocked_by_active_build_overlay() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
assert can_start(DummyJob(operation="refresh_workshop_items"), state) is False
|
|
|
|
|
|
def test_server_job_blocked_when_blueprint_overlay_is_building() -> None:
|
|
state = SchedulerState()
|
|
state.running_overlays.add(7)
|
|
state.blocked_servers_by_overlay.add(42)
|
|
assert can_start(DummyJob(operation="start", server_id=42), state) is False
|
|
# Other servers (whose blueprints don't reference overlay 7) are NOT blocked.
|
|
assert can_start(DummyJob(operation="start", server_id=43), state) is True
|
|
|
|
|
|
@pytest.fixture
|
|
def overlay_seeded_worker(seeded_worker):
|
|
app, ids = seeded_worker
|
|
with session_scope() as s:
|
|
overlay = Overlay(name="ws", path="9", type="workshop", user_id=ids.user)
|
|
s.add(overlay)
|
|
s.flush()
|
|
# Move server_two onto a different blueprint with NO workshop overlay,
|
|
# so the test can distinguish "blocked by overlay build" from "any
|
|
# server is blocked".
|
|
bp_with_overlay = s.scalar(select(Blueprint).where(Blueprint.user_id == ids.user))
|
|
s.add(BlueprintOverlay(blueprint_id=bp_with_overlay.id, overlay_id=overlay.id, position=0))
|
|
bp_without = Blueprint(user_id=ids.user, name="no-overlay", arguments="[]", config="[]")
|
|
s.add(bp_without)
|
|
s.flush()
|
|
server_two = s.scalar(select(Server).where(Server.id == ids.server_two))
|
|
server_two.blueprint_id = bp_without.id
|
|
ids.overlay = overlay.id
|
|
return app, ids
|
|
|
|
|
|
def test_scheduler_state_finds_servers_blocked_by_running_build(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
|
|
|
|
with session_scope() as s:
|
|
state = build_scheduler_state(s)
|
|
|
|
assert ids.overlay in state.running_overlays
|
|
assert ids.server_one in state.blocked_servers_by_overlay
|
|
assert ids.server_two not in state.blocked_servers_by_overlay
|
|
|
|
|
|
def test_enqueue_build_overlay_creates_new_job_when_none_pending(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
|
|
with session_scope() as s:
|
|
job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert job.operation == "build_overlay"
|
|
assert job.overlay_id == ids.overlay
|
|
assert job.server_id is None
|
|
assert job.state == "queued"
|
|
|
|
|
|
def test_enqueue_build_overlay_coalesces_against_pending(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
|
|
with session_scope() as s:
|
|
first = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
first_id = first.id
|
|
|
|
with session_scope() as s:
|
|
second = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert second.id == first_id, "should coalesce against the pending job"
|
|
|
|
with session_scope() as s:
|
|
n = s.query(Job).filter_by(operation="build_overlay", overlay_id=ids.overlay).count()
|
|
assert n == 1
|
|
|
|
|
|
def test_enqueue_build_overlay_does_not_coalesce_against_running(overlay_seeded_worker) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
add_job(ids.user, "build_overlay", server_id=None, state="running", overlay_id=ids.overlay)
|
|
|
|
with session_scope() as s:
|
|
new_job = enqueue_build_overlay(s, overlay_id=ids.overlay, user_id=ids.user)
|
|
assert new_job.state == "queued"
|
|
|
|
with session_scope() as s:
|
|
running = s.scalars(
|
|
select(Job).where(
|
|
Job.operation == "build_overlay",
|
|
Job.overlay_id == ids.overlay,
|
|
Job.state == "running",
|
|
)
|
|
).all()
|
|
queued = s.scalars(
|
|
select(Job).where(
|
|
Job.operation == "build_overlay",
|
|
Job.overlay_id == ids.overlay,
|
|
Job.state == "queued",
|
|
)
|
|
).all()
|
|
assert len(running) == 1
|
|
assert len(queued) == 1
|
|
|
|
|
|
def test_run_worker_once_dispatches_build_overlay(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(steam_id="1001", title="A", filename="a.vpk", file_url="u", file_size=3, time_updated=1)
|
|
s.add(wi)
|
|
s.flush()
|
|
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
|
|
cache = tmp_path / "workshop_cache"
|
|
cache.mkdir()
|
|
(cache / "1001.vpk").write_bytes(b"abc")
|
|
import os
|
|
os.utime(cache / "1001.vpk", (1, 1))
|
|
# Mark item as downloaded.
|
|
with session_scope() as s:
|
|
wi = s.query(WorkshopItem).filter_by(steam_id="1001").one()
|
|
wi.last_downloaded_at = datetime.now(UTC)
|
|
|
|
job_id = add_job(ids.user, "build_overlay", server_id=None, overlay_id=ids.overlay)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
job = load_job(job_id)
|
|
assert job.state == "succeeded", job.exit_code
|
|
addons = tmp_path / "overlays" / "9" / "left4dead2" / "addons"
|
|
assert (addons / "1001.vpk").is_symlink()
|
|
|
|
|
|
def test_run_worker_once_dispatches_refresh(overlay_seeded_worker, monkeypatch, tmp_path) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
from l4d2web.services import steam_workshop, job_worker
|
|
|
|
refresh_calls = []
|
|
|
|
def fake_refresh_workshop_items(*, on_stdout, on_stderr, should_cancel):
|
|
refresh_calls.append(True)
|
|
on_stdout("refresh phase complete (downloaded=0 errors=0)")
|
|
return [] # no overlays affected
|
|
|
|
monkeypatch.setattr(job_worker, "_run_refresh_workshop_items", fake_refresh_workshop_items)
|
|
|
|
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
assert refresh_calls == [True]
|
|
job = load_job(job_id)
|
|
assert job.state == "succeeded"
|
|
|
|
|
|
def test_refresh_downloads_serially_to_keep_web_worker_responsive(
|
|
overlay_seeded_worker, monkeypatch, tmp_path
|
|
) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
from l4d2web.services import job_worker, steam_workshop
|
|
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(
|
|
steam_id="1001",
|
|
title="A",
|
|
filename="old.vpk",
|
|
file_url="https://example.com/old.vpk",
|
|
file_size=1,
|
|
time_updated=1,
|
|
)
|
|
s.add(wi)
|
|
s.flush()
|
|
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001",
|
|
title="A",
|
|
filename="new.vpk",
|
|
file_url="https://example.com/new.vpk",
|
|
file_size=2,
|
|
time_updated=2,
|
|
preview_url="",
|
|
consumer_app_id=550,
|
|
result=1,
|
|
)
|
|
monkeypatch.setattr(steam_workshop, "fetch_metadata_batch", lambda steam_ids, *, mode: [meta])
|
|
refresh_calls = []
|
|
|
|
def fake_refresh_all(metas, cache_root, *, executor_workers=8, should_cancel=None):
|
|
refresh_calls.append((list(metas), executor_workers, should_cancel is not None))
|
|
return steam_workshop.RefreshReport(downloaded=1, errors=0)
|
|
|
|
monkeypatch.setattr(steam_workshop, "refresh_all", fake_refresh_all)
|
|
|
|
with app.app_context():
|
|
affected = job_worker._run_refresh_workshop_items(
|
|
on_stdout=lambda _line: None,
|
|
on_stderr=lambda _line: None,
|
|
should_cancel=lambda: False,
|
|
)
|
|
|
|
assert affected == [ids.overlay]
|
|
assert refresh_calls == [([meta], 1, True)]
|
|
|
|
|
|
def test_refresh_job_enqueues_build_overlay_without_locking_its_final_log(
|
|
overlay_seeded_worker, monkeypatch, tmp_path
|
|
) -> None:
|
|
app, ids = overlay_seeded_worker
|
|
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
|
|
|
|
from l4d2web.services import steam_workshop
|
|
|
|
with session_scope() as s:
|
|
wi = WorkshopItem(
|
|
steam_id="1001",
|
|
title="A",
|
|
filename="old.vpk",
|
|
file_url="https://example.com/old.vpk",
|
|
file_size=1,
|
|
time_updated=1,
|
|
)
|
|
s.add(wi)
|
|
s.flush()
|
|
s.add(OverlayWorkshopItem(overlay_id=ids.overlay, workshop_item_id=wi.id))
|
|
|
|
meta = steam_workshop.WorkshopMetadata(
|
|
steam_id="1001",
|
|
title="A",
|
|
filename="new.vpk",
|
|
file_url="https://example.com/new.vpk",
|
|
file_size=2,
|
|
time_updated=2,
|
|
preview_url="",
|
|
consumer_app_id=550,
|
|
result=1,
|
|
)
|
|
monkeypatch.setattr(steam_workshop, "fetch_metadata_batch", lambda steam_ids, *, mode: [meta])
|
|
monkeypatch.setattr(
|
|
steam_workshop,
|
|
"refresh_all",
|
|
lambda metas, cache_root, **kwargs: steam_workshop.RefreshReport(downloaded=1, errors=0),
|
|
)
|
|
|
|
job_id = add_job(ids.user, "refresh_workshop_items", server_id=None)
|
|
|
|
with app.app_context():
|
|
assert run_worker_once() is True
|
|
|
|
with session_scope() as s:
|
|
job = s.scalar(select(Job).where(Job.id == job_id))
|
|
build_job = s.scalar(
|
|
select(Job).where(
|
|
Job.operation == "build_overlay",
|
|
Job.overlay_id == ids.overlay,
|
|
Job.state == "queued",
|
|
)
|
|
)
|
|
lines = [row.line for row in job_logs_for(s, job_id)]
|
|
|
|
assert job is not None
|
|
assert job.state == "succeeded"
|
|
assert build_job is not None
|
|
assert "enqueued build_overlay for 1 overlay(s)" in lines
|