left4me/l4d2web/tests/test_script_overlay_routes.py
mwiegand a62f26ba4a
fix(l4d2-web): normalize CRLF to LF in script overlay POST
HTML <textarea> form submission encodes line breaks as CRLF per spec.
Storing those CRLFs unchanged means every line of the script reaches
bash with a trailing \r, which bash treats as part of the argument —
turning "ls /" into "ls /\r" and failing. Normalize CRLF/CR → LF in the
/overlays/{id}/script handler so storage and the sandbox tmpfile are
LF-only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 16:20:10 +02:00

273 lines
8.9 KiB
Python

"""Routes for type='script' overlays: create, /script (update body),
/wipe, /build. Permissions mirror workshop overlays (owner or admin)."""
from __future__ import annotations
import pytest
from sqlalchemy import select
from l4d2web.app import create_app
from l4d2web.auth import hash_password
from l4d2web.db import init_db, session_scope
from l4d2web.models import Job, Overlay, User
@pytest.fixture
def app(tmp_path, monkeypatch):
db_url = f"sqlite:///{tmp_path/'script-routes.db'}"
monkeypatch.setenv("DATABASE_URL", db_url)
monkeypatch.setenv("LEFT4ME_ROOT", str(tmp_path))
flask_app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
init_db()
return flask_app
@pytest.fixture
def alice_id(app) -> int:
with session_scope() as s:
user = User(username="alice", password_digest=hash_password("x"), admin=False)
s.add(user)
s.flush()
return user.id
@pytest.fixture
def bob_id(app) -> int:
with session_scope() as s:
user = User(username="bob", password_digest=hash_password("x"), admin=False)
s.add(user)
s.flush()
return user.id
@pytest.fixture
def admin_id(app) -> int:
with session_scope() as s:
user = User(username="admin", password_digest=hash_password("x"), admin=True)
s.add(user)
s.flush()
return user.id
def _client_for(app, user_id: int):
client = app.test_client()
with client.session_transaction() as sess:
sess["user_id"] = user_id
sess["csrf_token"] = "test-token"
return client
def _create_script_overlay(app, user_id: int, *, name: str = "x") -> int:
client = _client_for(app, user_id)
response = client.post(
"/overlays",
data={"name": name, "type": "script"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302, response.get_data(as_text=True)
with session_scope() as s:
return s.scalar(select(Overlay.id).where(Overlay.name == name))
def test_create_script_overlay(app, alice_id) -> None:
client = _client_for(app, alice_id)
response = client.post(
"/overlays",
data={"name": "first", "type": "script"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302
with session_scope() as s:
overlay = s.query(Overlay).filter_by(name="first").one()
assert overlay.type == "script"
assert overlay.script == ""
assert overlay.last_build_status == ""
assert overlay.user_id == alice_id
assert overlay.path == str(overlay.id)
def test_admin_creates_system_wide_script_overlay(app, admin_id) -> None:
client = _client_for(app, admin_id)
response = client.post(
"/overlays",
data={"name": "system", "type": "script", "system_wide": "1"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302
with session_scope() as s:
overlay = s.query(Overlay).filter_by(name="system").one()
assert overlay.user_id is None
def test_non_admin_system_wide_flag_is_ignored(app, alice_id) -> None:
client = _client_for(app, alice_id)
response = client.post(
"/overlays",
data={"name": "evil", "type": "script", "system_wide": "1"},
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302
with session_scope() as s:
overlay = s.query(Overlay).filter_by(name="evil").one()
assert overlay.user_id == alice_id
def test_update_script_body_enqueues_build(app, alice_id) -> None:
overlay_id = _create_script_overlay(app, alice_id)
client = _client_for(app, alice_id)
r1 = client.post(
f"/overlays/{overlay_id}/script",
data={"script": "echo new"},
headers={"X-CSRF-Token": "test-token"},
)
assert r1.status_code == 302
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
assert overlay.script == "echo new"
jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all()
assert len(jobs) == 1
# Coalesce against pending.
r2 = client.post(
f"/overlays/{overlay_id}/script",
data={"script": "echo newer"},
headers={"X-CSRF-Token": "test-token"},
)
assert r2.status_code == 302
with session_scope() as s:
jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all()
assert len(jobs) == 1
def test_update_script_normalizes_crlf_to_lf(app, alice_id) -> None:
"""HTML <textarea> submits CRLF line endings; bash chokes on trailing \\r
in every command. Storage must be LF-only so the sandbox tmpfile is
well-formed."""
overlay_id = _create_script_overlay(app, alice_id)
client = _client_for(app, alice_id)
client.post(
f"/overlays/{overlay_id}/script",
data={"script": "ls /\r\necho hello\r\n"},
headers={"X-CSRF-Token": "test-token"},
)
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
assert overlay.script == "ls /\necho hello\n"
assert "\r" not in overlay.script
def test_manual_rebuild(app, alice_id) -> None:
overlay_id = _create_script_overlay(app, alice_id)
client = _client_for(app, alice_id)
r1 = client.post(
f"/overlays/{overlay_id}/build",
headers={"X-CSRF-Token": "test-token"},
)
assert r1.status_code == 302
with session_scope() as s:
jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all()
assert len(jobs) == 1
# Coalesce.
r2 = client.post(
f"/overlays/{overlay_id}/build",
headers={"X-CSRF-Token": "test-token"},
)
assert r2.status_code == 302
with session_scope() as s:
jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all()
assert len(jobs) == 1
def test_wipe_runs_find_delete(app, alice_id, monkeypatch) -> None:
overlay_id = _create_script_overlay(app, alice_id)
# Pre-set a "successful" status so we can verify wipe resets it.
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
overlay.last_build_status = "ok"
captured: dict = {}
def fake_run(overlay_id_arg, script_text, *, on_stdout, on_stderr, should_cancel):
captured["overlay_id"] = overlay_id_arg
captured["script"] = script_text
from l4d2web.services import overlay_builders
monkeypatch.setattr(overlay_builders, "run_sandboxed_script", fake_run)
client = _client_for(app, alice_id)
response = client.post(
f"/overlays/{overlay_id}/wipe",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 302
assert captured["overlay_id"] == overlay_id
assert captured["script"] == "find /overlay -mindepth 1 -delete"
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
assert overlay.last_build_status == ""
# Wipe does NOT auto-enqueue a rebuild.
jobs = s.query(Job).filter_by(operation="build_overlay", overlay_id=overlay_id).all()
assert len(jobs) == 0
def test_wipe_refuses_during_running_build(app, alice_id, monkeypatch) -> None:
overlay_id = _create_script_overlay(app, alice_id)
# Mark a build as running for this overlay.
with session_scope() as s:
s.add(
Job(
user_id=alice_id,
server_id=None,
overlay_id=overlay_id,
operation="build_overlay",
state="running",
)
)
invocations: list = []
def fake_run(*args, **kwargs):
invocations.append((args, kwargs))
from l4d2web.services import overlay_builders
monkeypatch.setattr(overlay_builders, "run_sandboxed_script", fake_run)
client = _client_for(app, alice_id)
response = client.post(
f"/overlays/{overlay_id}/wipe",
headers={"X-CSRF-Token": "test-token"},
)
assert response.status_code == 409
assert invocations == []
def test_permissions_non_owner_denied(app, alice_id, bob_id) -> None:
overlay_id = _create_script_overlay(app, alice_id, name="alice-private")
bob = _client_for(app, bob_id)
r1 = bob.post(
f"/overlays/{overlay_id}/script",
data={"script": "boom"},
headers={"X-CSRF-Token": "test-token"},
)
assert r1.status_code == 403
def test_permissions_admin_can_edit_any(app, alice_id, admin_id) -> None:
overlay_id = _create_script_overlay(app, alice_id, name="alice-private")
admin = _client_for(app, admin_id)
r1 = admin.post(
f"/overlays/{overlay_id}/script",
data={"script": "echo admin"},
headers={"X-CSRF-Token": "test-token"},
)
assert r1.status_code == 302
with session_scope() as s:
overlay = s.query(Overlay).filter_by(id=overlay_id).one()
assert overlay.script == "echo admin"