"""Routes for the overlay 'Files' section. Read-only endpoints (any overlay): - `GET /overlays//files?path=` — HTML fragment listing one directory level. Used both for the initial server-rendered root and for HTMX swaps when a folder expands. - `GET /overlays//files/download?path=` — streams a single file. Symlinks resolving anywhere under `LEFT4ME_ROOT` are allowed (so workshop addons stream from the shared cache); anything escaping it is refused. Mutating endpoints (only `overlay.type == 'files'`, owner or admin): - `GET /overlays//files/content?path=` — JSON `{path, content}` for an editable text file, 415 if not editable. - `POST /overlays//files/save` — JSON `{path, content, new_path?}`, text-mode write with optional atomic rename. - `POST /overlays//files/replace` — multipart `path`, `file`, optional `new_path`. Binary-mode replace with optional atomic rename. - `POST /overlays//files/upload` — multipart `target_path`, single `file` (carrying `webkitRelativePath`). One file per request; client fans out for multi-file or whole-folder drops. - `POST /overlays//files/move` — JSON `{src, dst}`. Internal-drag rename or move; refuses cycles via `safe_resolve_for_move`. - `POST /overlays//files/mkdir` — JSON `{path}`. Slashes allowed in `path`; intermediate directories are created. - `POST /overlays//files/delete` — form `path`. Refuses non-empty directories. - `GET /overlays//files/download_zip?path=` — streams a zip of the folder's contents. """ from __future__ import annotations import io import os import shutil import tempfile import zipfile from flask import ( Blueprint, Response, jsonify, render_template, request, send_file, ) from sqlalchemy import select from l4d2web.auth import current_user, require_login from l4d2web.db import session_scope from l4d2web.models import Overlay, Server from l4d2web.services.overlay_files import ( is_editable, list_directory, safe_resolve_for_delete, safe_resolve_for_download, safe_resolve_for_listing, safe_resolve_for_move, safe_resolve_for_server_download, safe_resolve_for_server_listing, safe_resolve_for_write, ) # Same caps as is_editable so /save can't sneak in something the listing # would have flagged non-editable. _SAVE_MAX_BYTES = 1 * 1024 * 1024 # Per-upload byte cap — keeps a single hostile request from filling disk. # 200 MiB picked generously for vpks; tune later if needed. _UPLOAD_MAX_BYTES = 200 * 1024 * 1024 bp = Blueprint("files", __name__) def _load_overlay_for_user(overlay_id: int, user) -> Overlay | Response: with session_scope() as db: overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id)) if overlay is None: return Response(status=404) if not user.admin and overlay.user_id is not None and overlay.user_id != user.id: return Response(status=403) # Detach by expunging — caller only reads scalar columns we already # populated, so lazy loads aren't a concern. db.expunge(overlay) return overlay def _load_files_overlay(overlay_id: int, user) -> Overlay | Response: """Like `_load_overlay_for_user`, but additionally 404s for any overlay whose type isn't `files`. Mutating endpoints use this; read-only ones keep working across all types.""" result = _load_overlay_for_user(overlay_id, user) if isinstance(result, Response): return result if result.type != "files": return Response(status=404) return result @bp.get("/overlays//files") @require_login def overlay_files_fragment(overlay_id: int): user = current_user() assert user is not None sub_path = request.args.get("path", "") result = _load_overlay_for_user(overlay_id, user) if isinstance(result, Response): return result overlay = result try: target = safe_resolve_for_listing(overlay.path, sub_path) overlay_root = safe_resolve_for_listing(overlay.path, "") except ValueError: return Response("invalid path", status=400) if not target.is_dir(): return Response(status=404) entries, truncated_count = list_directory(target, overlay_root) return render_template( "_overlay_file_tree.html", overlay=overlay, entries=entries, truncated=truncated_count > 0, truncated_count=truncated_count, files_base_url=f"/overlays/{overlay_id}", download_supported=True, files_overlay=(overlay.type == "files"), ) @bp.get("/overlays//files/download") @require_login def overlay_files_download(overlay_id: int): user = current_user() assert user is not None sub_path = request.args.get("path", "") result = _load_overlay_for_user(overlay_id, user) if isinstance(result, Response): return result overlay = result try: real = safe_resolve_for_download(overlay.path, sub_path) except ValueError: return Response("invalid path", status=400) if not real.exists(): return Response(status=404) if real.is_dir(): return Response("not a file", status=400) return send_file( str(real), as_attachment=True, download_name=os.path.basename(real) ) @bp.get("/servers//files") @require_login def server_files_fragment(server_id: int): """Listing-only file tree for a server's runtime merged view. No download counterpart — runtime holds large game binaries the user doesn't need to pull down through the web app. """ user = current_user() assert user is not None sub_path = request.args.get("path", "") with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id)) if server is None: return Response(status=404) try: target = safe_resolve_for_server_listing(server_id, sub_path) merged_root = safe_resolve_for_server_listing(server_id, "") except ValueError: return Response("invalid path", status=400) if target is None or merged_root is None: return Response(status=404) if not target.is_dir(): return Response(status=404) entries, truncated_count = list_directory(target, merged_root) return render_template( "_overlay_file_tree.html", entries=entries, truncated=truncated_count > 0, truncated_count=truncated_count, files_base_url=f"/servers/{server_id}", download_supported=True, ) @bp.get("/overlays//files/content") @require_login def overlay_file_content(overlay_id: int): """Return `{path, content}` for an editable text file.""" user = current_user() assert user is not None sub_path = request.args.get("path", "") result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: target = safe_resolve_for_listing(overlay.path, sub_path) except ValueError: return Response("invalid path", status=400) if not target.exists() or not target.is_file(): return Response(status=404) if not is_editable(target): return Response("not editable", status=415) try: content = target.read_text(encoding="utf-8") except OSError: return Response("read failed", status=500) except UnicodeDecodeError: # is_editable sniffed only the first 8 KiB; the tail can still fail. return Response("not editable", status=415) return jsonify({"path": sub_path, "content": content}) def _validate_save_content(content: str) -> Response | None: if len(content.encode("utf-8")) > _SAVE_MAX_BYTES: return Response("content exceeds 1 MiB", status=413) if "\x00" in content: return Response("content contains NUL bytes", status=415) return None @bp.post("/overlays//files/save") @require_login def overlay_file_save(overlay_id: int): """Write text content to `path`. Optional `new_path` performs a rename in the same call (atomic: rename then write; both succeed or neither).""" user = current_user() assert user is not None payload = request.get_json(silent=True) or {} path = (payload.get("path") or "").strip() new_path = payload.get("new_path") new_path = new_path.strip() if isinstance(new_path, str) and new_path.strip() else None content = payload.get("content") if not path or not isinstance(content, str): return Response("missing path or content", status=400) err = _validate_save_content(content) if err is not None: return err result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: write_target = safe_resolve_for_write(overlay.path, new_path or path) except ValueError as exc: return Response(str(exc), status=422) # Rename branch: source must exist, dst must not collide. if new_path is not None and new_path != path: try: src_path, dst_path = safe_resolve_for_move(overlay.path, path, new_path) except ValueError as exc: return Response(str(exc), status=422) if dst_path.exists(): return Response("destination already exists", status=409) dst_path.parent.mkdir(parents=True, exist_ok=True) os.rename(src_path, dst_path) write_target = dst_path else: write_target.parent.mkdir(parents=True, exist_ok=True) # Creation branch: must not collide with an existing path. if write_target.exists() and not _is_existing_file(write_target): return Response("destination is not a file", status=409) try: write_target.write_text(content, encoding="utf-8") except OSError as exc: return Response(f"write failed: {exc}", status=500) return jsonify({"path": new_path or path}) def _is_existing_file(path) -> bool: return path.is_file() and not path.is_symlink() @bp.post("/overlays//files/replace") @require_login def overlay_file_replace(overlay_id: int): """Replace the bytes of `path` with the uploaded `file`. Optional `new_path` performs an atomic rename in the same call.""" user = current_user() assert user is not None path = (request.form.get("path") or "").strip() new_path = (request.form.get("new_path") or "").strip() or None upload = request.files.get("file") if not path or upload is None: return Response("missing path or file", status=400) result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result if new_path and new_path != path: try: src_path, dst_path = safe_resolve_for_move(overlay.path, path, new_path) except ValueError as exc: return Response(str(exc), status=422) if dst_path.exists(): return Response("destination already exists", status=409) dst_path.parent.mkdir(parents=True, exist_ok=True) os.rename(src_path, dst_path) write_target = dst_path echo_path = new_path else: try: write_target = safe_resolve_for_write(overlay.path, path) except ValueError as exc: return Response(str(exc), status=422) write_target.parent.mkdir(parents=True, exist_ok=True) echo_path = path return _stream_upload_into(upload, write_target, echo_path) def _stream_upload_into(upload, write_target, echo_path: str) -> Response: """Write the multipart upload's stream into `write_target`, enforcing the per-upload size cap. Cleans up the partial file on failure or over-cap so a cancelled / oversized upload doesn't leak bytes. `echo_path` is what the route reports back as the canonical relative path for the new file (the rename target if a rename happened, else the original path). The function doesn't recompute this so it can be passed through verbatim. """ tmp_dir = write_target.parent tmp_dir.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp(prefix=".upload-", dir=str(tmp_dir)) bytes_seen = 0 try: with os.fdopen(fd, "wb") as out: while True: chunk = upload.stream.read(64 * 1024) if not chunk: break bytes_seen += len(chunk) if bytes_seen > _UPLOAD_MAX_BYTES: raise _UploadTooLarge() out.write(chunk) os.replace(tmp_name, write_target) except _UploadTooLarge: try: os.unlink(tmp_name) except OSError: pass return Response("upload exceeds size cap", status=413) except OSError as exc: try: os.unlink(tmp_name) except OSError: pass return Response(f"upload failed: {exc}", status=500) return jsonify({"path": echo_path}) class _UploadTooLarge(Exception): pass def _join_clean(folder: str, leaf: str) -> str: """Join target_path + relative path safely, trimming slashes.""" folder = (folder or "").strip("/").strip() leaf = (leaf or "").strip("/").strip() if folder and leaf: return f"{folder}/{leaf}" return folder or leaf @bp.post("/overlays//files/upload") @require_login def overlay_file_upload(overlay_id: int): """Single-file upload. Multi-file or whole-folder drops fan out client side into one POST per file, each carrying its `webkitRelativePath` in `relative_path`. Conflicts return 409 unless `overwrite=1`.""" user = current_user() assert user is not None target_folder = (request.form.get("target_path") or "").strip() relative_path = (request.form.get("relative_path") or "").strip() overwrite = request.form.get("overwrite") == "1" upload = request.files.get("file") if upload is None: return Response("missing file", status=400) # Filename fallback for browsers that don't send relative_path. filename = relative_path or (upload.filename or "").strip() if not filename: return Response("missing filename", status=400) # Normalise: strip any DOS-style path components from the filename. filename = filename.replace("\\", "/") full_rel = _join_clean(target_folder, filename) if not full_rel: return Response("missing destination path", status=400) result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: write_target = safe_resolve_for_write(overlay.path, full_rel) except ValueError as exc: return Response(str(exc), status=422) if write_target.exists() and not overwrite: return Response("file already exists", status=409) write_target.parent.mkdir(parents=True, exist_ok=True) return _stream_upload_into(upload, write_target, full_rel) @bp.post("/overlays//files/move") @require_login def overlay_file_move(overlay_id: int): """Rename / move a file or folder.""" user = current_user() assert user is not None payload = request.get_json(silent=True) or {} src = (payload.get("src") or "").strip() dst = (payload.get("dst") or "").strip() overwrite = bool(payload.get("overwrite")) if not src or not dst: return Response("missing src or dst", status=400) result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: src_path, dst_path = safe_resolve_for_move(overlay.path, src, dst) except ValueError as exc: return Response(str(exc), status=422) if dst_path.exists() and not overwrite: return Response("destination already exists", status=409) dst_path.parent.mkdir(parents=True, exist_ok=True) try: if dst_path.exists() and overwrite: if dst_path.is_dir() and not dst_path.is_symlink(): shutil.rmtree(dst_path) else: os.unlink(dst_path) os.rename(src_path, dst_path) except OSError as exc: return Response(f"move failed: {exc}", status=500) return jsonify({"src": src, "dst": dst}) @bp.post("/overlays//files/mkdir") @require_login def overlay_file_mkdir(overlay_id: int): """Create empty directory `path`. Slashes in `path` create intermediates.""" user = current_user() assert user is not None payload = request.get_json(silent=True) or {} path = (payload.get("path") or "").strip("/").strip() if not path: return Response("missing path", status=400) result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: target = safe_resolve_for_write(overlay.path, path) except ValueError as exc: return Response(str(exc), status=422) if target.exists(): if target.is_dir() and not target.is_symlink(): # Idempotent — folder already there. return jsonify({"path": path}) return Response("destination already exists and is not a directory", status=409) try: target.mkdir(parents=True, exist_ok=False) except FileExistsError: return Response("directory already exists", status=409) except OSError as exc: return Response(f"mkdir failed: {exc}", status=500) return jsonify({"path": path}) @bp.post("/overlays//files/delete") @require_login def overlay_file_delete(overlay_id: int): """Delete a file or empty folder. Refuses recursive directory removal.""" user = current_user() assert user is not None path = (request.form.get("path") or "").strip() if not path: return Response("missing path", status=400) result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: target = safe_resolve_for_delete(overlay.path, path) except ValueError as exc: return Response(str(exc), status=422) if not target.exists() and not target.is_symlink(): return Response(status=404) try: if target.is_dir() and not target.is_symlink(): try: target.rmdir() except OSError: return Response("directory is not empty", status=409) else: os.unlink(target) except OSError as exc: return Response(f"delete failed: {exc}", status=500) return jsonify({"path": path}) @bp.get("/overlays//files/download_zip") @require_login def overlay_file_download_zip(overlay_id: int): """Stream a zip of the folder at `path` (or the overlay root). Symlinks are written as their resolved file content (matches the regular download endpoint's behavior — workshop-cache symlinks streamed as bytes).""" user = current_user() assert user is not None sub_path = request.args.get("path", "") result = _load_files_overlay(overlay_id, user) if isinstance(result, Response): return result overlay = result try: target = safe_resolve_for_listing(overlay.path, sub_path) except ValueError: return Response("invalid path", status=400) if not target.exists() or not target.is_dir(): return Response(status=404) folder_name = os.path.basename(str(target)) or "overlay" download_name = f"{folder_name}.zip" buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(target, followlinks=False): for name in files: abs_path = os.path.join(root, name) rel = os.path.relpath(abs_path, str(target)) try: zf.write(abs_path, arcname=rel) except OSError: continue # Include empty directories so the structure round-trips. for name in dirs: abs_dir = os.path.join(root, name) rel_dir = os.path.relpath(abs_dir, str(target)) + "/" if not any(True for _ in os.scandir(abs_dir)): zf.writestr(zipfile.ZipInfo(rel_dir), b"") buffer.seek(0) return send_file( buffer, mimetype="application/zip", as_attachment=True, download_name=download_name, ) @bp.get("/servers//files/download") @require_login def server_files_download(server_id: int): user = current_user() assert user is not None sub_path = request.args.get("path", "") with session_scope() as db: server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id)) if server is None: return Response(status=404) try: real = safe_resolve_for_server_download(server_id, sub_path) except ValueError: return Response("invalid path", status=400) if real is None or not real.exists(): return Response(status=404) if real.is_dir(): return Response("not a file", status=400) return send_file( str(real), as_attachment=True, download_name=os.path.basename(real) )