Adds Overlay.type='files' whose source-of-truth IS the overlay directory
itself. Users can:
* upload arbitrary files / whole folders by dragging from the OS onto a
folder row in the file tree (one POST per file, queue with
concurrency 3, per-file progress in a floating Uploads panel)
* move via drag-and-drop inside the tree (same gesture, source
distinguishes; refuses cycles)
* create / edit / rename / replace through a single editor modal
(text flavor for editable files, binary flavor with replace-upload
for everything else; filename input is the rename surface)
* mkdir empty folders (slashes allowed for nested intermediates)
* stream a folder as a zip download
* delete files and empty folders
Backend is type-agnostic past the new files_routes endpoints, so the
existing mount / spec / overlayfs / expose_server_cfg pipeline is reused
unchanged. is_editable gates the row's edit affordance and the /save
content rules. Three new safe-resolve helpers (write/delete/move) cover
the new operations with the same anchor-and-resolve pattern as listing
and download. FilesBuilder is a no-op so the build subsystem can
dispatch uniformly.
Spec: docs/superpowers/specs/2026-05-09-files-overlay-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
634 lines
21 KiB
Python
634 lines
21 KiB
Python
"""Routes for the overlay 'Files' section.
|
|
|
|
Read-only endpoints (any overlay):
|
|
- `GET /overlays/<id>/files?path=<rel>` — HTML fragment listing one
|
|
directory level. Used both for the initial server-rendered root and
|
|
for HTMX swaps when a folder expands.
|
|
- `GET /overlays/<id>/files/download?path=<rel>` — streams a single file.
|
|
Symlinks resolving anywhere under `LEFT4ME_ROOT` are allowed (so
|
|
workshop addons stream from the shared cache); anything escaping it
|
|
is refused.
|
|
|
|
Mutating endpoints (only `overlay.type == 'files'`, owner or admin):
|
|
- `GET /overlays/<id>/files/content?path=` — JSON `{path, content}` for
|
|
an editable text file, 415 if not editable.
|
|
- `POST /overlays/<id>/files/save` — JSON `{path, content, new_path?}`,
|
|
text-mode write with optional atomic rename.
|
|
- `POST /overlays/<id>/files/replace` — multipart `path`, `file`,
|
|
optional `new_path`. Binary-mode replace with optional atomic rename.
|
|
- `POST /overlays/<id>/files/upload` — multipart `target_path`, single
|
|
`file` (carrying `webkitRelativePath`). One file per request; client
|
|
fans out for multi-file or whole-folder drops.
|
|
- `POST /overlays/<id>/files/move` — JSON `{src, dst}`. Internal-drag
|
|
rename or move; refuses cycles via `safe_resolve_for_move`.
|
|
- `POST /overlays/<id>/files/mkdir` — JSON `{path}`. Slashes allowed in
|
|
`path`; intermediate directories are created.
|
|
- `POST /overlays/<id>/files/delete` — form `path`. Refuses non-empty
|
|
directories.
|
|
- `GET /overlays/<id>/files/download_zip?path=` — streams a zip of the
|
|
folder's contents.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
|
|
from flask import (
|
|
Blueprint,
|
|
Response,
|
|
jsonify,
|
|
render_template,
|
|
request,
|
|
send_file,
|
|
)
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import current_user, require_login
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import Overlay, Server
|
|
from l4d2web.services.overlay_files import (
|
|
is_editable,
|
|
list_directory,
|
|
safe_resolve_for_delete,
|
|
safe_resolve_for_download,
|
|
safe_resolve_for_listing,
|
|
safe_resolve_for_move,
|
|
safe_resolve_for_server_download,
|
|
safe_resolve_for_server_listing,
|
|
safe_resolve_for_write,
|
|
)
|
|
|
|
|
|
# Same caps as is_editable so /save can't sneak in something the listing
|
|
# would have flagged non-editable.
|
|
_SAVE_MAX_BYTES = 1 * 1024 * 1024
|
|
# Per-upload byte cap — keeps a single hostile request from filling disk.
|
|
# 200 MiB picked generously for vpks; tune later if needed.
|
|
_UPLOAD_MAX_BYTES = 200 * 1024 * 1024
|
|
|
|
|
|
bp = Blueprint("files", __name__)
|
|
|
|
|
|
def _load_overlay_for_user(overlay_id: int, user) -> Overlay | Response:
|
|
with session_scope() as db:
|
|
overlay = db.scalar(select(Overlay).where(Overlay.id == overlay_id))
|
|
if overlay is None:
|
|
return Response(status=404)
|
|
if not user.admin and overlay.user_id is not None and overlay.user_id != user.id:
|
|
return Response(status=403)
|
|
# Detach by expunging — caller only reads scalar columns we already
|
|
# populated, so lazy loads aren't a concern.
|
|
db.expunge(overlay)
|
|
return overlay
|
|
|
|
|
|
def _load_files_overlay(overlay_id: int, user) -> Overlay | Response:
|
|
"""Like `_load_overlay_for_user`, but additionally 404s for any overlay
|
|
whose type isn't `files`. Mutating endpoints use this; read-only ones
|
|
keep working across all types."""
|
|
result = _load_overlay_for_user(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
if result.type != "files":
|
|
return Response(status=404)
|
|
return result
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/files")
|
|
@require_login
|
|
def overlay_files_fragment(overlay_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
sub_path = request.args.get("path", "")
|
|
|
|
result = _load_overlay_for_user(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
target = safe_resolve_for_listing(overlay.path, sub_path)
|
|
overlay_root = safe_resolve_for_listing(overlay.path, "")
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if not target.is_dir():
|
|
return Response(status=404)
|
|
|
|
entries, truncated_count = list_directory(target, overlay_root)
|
|
return render_template(
|
|
"_overlay_file_tree.html",
|
|
overlay=overlay,
|
|
entries=entries,
|
|
truncated=truncated_count > 0,
|
|
truncated_count=truncated_count,
|
|
files_base_url=f"/overlays/{overlay_id}",
|
|
download_supported=True,
|
|
files_overlay=(overlay.type == "files"),
|
|
)
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/files/download")
|
|
@require_login
|
|
def overlay_files_download(overlay_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
sub_path = request.args.get("path", "")
|
|
|
|
result = _load_overlay_for_user(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
real = safe_resolve_for_download(overlay.path, sub_path)
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if not real.exists():
|
|
return Response(status=404)
|
|
if real.is_dir():
|
|
return Response("not a file", status=400)
|
|
|
|
return send_file(
|
|
str(real), as_attachment=True, download_name=os.path.basename(real)
|
|
)
|
|
|
|
|
|
@bp.get("/servers/<int:server_id>/files")
|
|
@require_login
|
|
def server_files_fragment(server_id: int):
|
|
"""Listing-only file tree for a server's runtime merged view. No download
|
|
counterpart — runtime holds large game binaries the user doesn't need to
|
|
pull down through the web app.
|
|
"""
|
|
user = current_user()
|
|
assert user is not None
|
|
sub_path = request.args.get("path", "")
|
|
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
|
|
if server is None:
|
|
return Response(status=404)
|
|
|
|
try:
|
|
target = safe_resolve_for_server_listing(server_id, sub_path)
|
|
merged_root = safe_resolve_for_server_listing(server_id, "")
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if target is None or merged_root is None:
|
|
return Response(status=404)
|
|
if not target.is_dir():
|
|
return Response(status=404)
|
|
|
|
entries, truncated_count = list_directory(target, merged_root)
|
|
return render_template(
|
|
"_overlay_file_tree.html",
|
|
entries=entries,
|
|
truncated=truncated_count > 0,
|
|
truncated_count=truncated_count,
|
|
files_base_url=f"/servers/{server_id}",
|
|
download_supported=True,
|
|
)
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/files/content")
|
|
@require_login
|
|
def overlay_file_content(overlay_id: int):
|
|
"""Return `{path, content}` for an editable text file."""
|
|
user = current_user()
|
|
assert user is not None
|
|
sub_path = request.args.get("path", "")
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
target = safe_resolve_for_listing(overlay.path, sub_path)
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if not target.exists() or not target.is_file():
|
|
return Response(status=404)
|
|
if not is_editable(target):
|
|
return Response("not editable", status=415)
|
|
|
|
try:
|
|
content = target.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return Response("read failed", status=500)
|
|
except UnicodeDecodeError:
|
|
# is_editable sniffed only the first 8 KiB; the tail can still fail.
|
|
return Response("not editable", status=415)
|
|
|
|
return jsonify({"path": sub_path, "content": content})
|
|
|
|
|
|
def _validate_save_content(content: str) -> Response | None:
|
|
if len(content.encode("utf-8")) > _SAVE_MAX_BYTES:
|
|
return Response("content exceeds 1 MiB", status=413)
|
|
if "\x00" in content:
|
|
return Response("content contains NUL bytes", status=415)
|
|
return None
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/save")
|
|
@require_login
|
|
def overlay_file_save(overlay_id: int):
|
|
"""Write text content to `path`. Optional `new_path` performs a rename
|
|
in the same call (atomic: rename then write; both succeed or neither)."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
path = (payload.get("path") or "").strip()
|
|
new_path = payload.get("new_path")
|
|
new_path = new_path.strip() if isinstance(new_path, str) and new_path.strip() else None
|
|
content = payload.get("content")
|
|
if not path or not isinstance(content, str):
|
|
return Response("missing path or content", status=400)
|
|
|
|
err = _validate_save_content(content)
|
|
if err is not None:
|
|
return err
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
write_target = safe_resolve_for_write(overlay.path, new_path or path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
|
|
# Rename branch: source must exist, dst must not collide.
|
|
if new_path is not None and new_path != path:
|
|
try:
|
|
src_path, dst_path = safe_resolve_for_move(overlay.path, path, new_path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
if dst_path.exists():
|
|
return Response("destination already exists", status=409)
|
|
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
|
os.rename(src_path, dst_path)
|
|
write_target = dst_path
|
|
else:
|
|
write_target.parent.mkdir(parents=True, exist_ok=True)
|
|
# Creation branch: must not collide with an existing path.
|
|
if write_target.exists() and not _is_existing_file(write_target):
|
|
return Response("destination is not a file", status=409)
|
|
|
|
try:
|
|
write_target.write_text(content, encoding="utf-8")
|
|
except OSError as exc:
|
|
return Response(f"write failed: {exc}", status=500)
|
|
|
|
return jsonify({"path": new_path or path})
|
|
|
|
|
|
def _is_existing_file(path) -> bool:
|
|
return path.is_file() and not path.is_symlink()
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/replace")
|
|
@require_login
|
|
def overlay_file_replace(overlay_id: int):
|
|
"""Replace the bytes of `path` with the uploaded `file`. Optional
|
|
`new_path` performs an atomic rename in the same call."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
path = (request.form.get("path") or "").strip()
|
|
new_path = (request.form.get("new_path") or "").strip() or None
|
|
upload = request.files.get("file")
|
|
if not path or upload is None:
|
|
return Response("missing path or file", status=400)
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
if new_path and new_path != path:
|
|
try:
|
|
src_path, dst_path = safe_resolve_for_move(overlay.path, path, new_path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
if dst_path.exists():
|
|
return Response("destination already exists", status=409)
|
|
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
|
os.rename(src_path, dst_path)
|
|
write_target = dst_path
|
|
echo_path = new_path
|
|
else:
|
|
try:
|
|
write_target = safe_resolve_for_write(overlay.path, path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
write_target.parent.mkdir(parents=True, exist_ok=True)
|
|
echo_path = path
|
|
|
|
return _stream_upload_into(upload, write_target, echo_path)
|
|
|
|
|
|
def _stream_upload_into(upload, write_target, echo_path: str) -> Response:
|
|
"""Write the multipart upload's stream into `write_target`, enforcing
|
|
the per-upload size cap. Cleans up the partial file on failure or
|
|
over-cap so a cancelled / oversized upload doesn't leak bytes.
|
|
|
|
`echo_path` is what the route reports back as the canonical relative
|
|
path for the new file (the rename target if a rename happened, else the
|
|
original path). The function doesn't recompute this so it can be passed
|
|
through verbatim.
|
|
"""
|
|
tmp_dir = write_target.parent
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp_name = tempfile.mkstemp(prefix=".upload-", dir=str(tmp_dir))
|
|
bytes_seen = 0
|
|
try:
|
|
with os.fdopen(fd, "wb") as out:
|
|
while True:
|
|
chunk = upload.stream.read(64 * 1024)
|
|
if not chunk:
|
|
break
|
|
bytes_seen += len(chunk)
|
|
if bytes_seen > _UPLOAD_MAX_BYTES:
|
|
raise _UploadTooLarge()
|
|
out.write(chunk)
|
|
os.replace(tmp_name, write_target)
|
|
except _UploadTooLarge:
|
|
try:
|
|
os.unlink(tmp_name)
|
|
except OSError:
|
|
pass
|
|
return Response("upload exceeds size cap", status=413)
|
|
except OSError as exc:
|
|
try:
|
|
os.unlink(tmp_name)
|
|
except OSError:
|
|
pass
|
|
return Response(f"upload failed: {exc}", status=500)
|
|
return jsonify({"path": echo_path})
|
|
|
|
|
|
class _UploadTooLarge(Exception):
|
|
pass
|
|
|
|
|
|
def _join_clean(folder: str, leaf: str) -> str:
|
|
"""Join target_path + relative path safely, trimming slashes."""
|
|
folder = (folder or "").strip("/").strip()
|
|
leaf = (leaf or "").strip("/").strip()
|
|
if folder and leaf:
|
|
return f"{folder}/{leaf}"
|
|
return folder or leaf
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/upload")
|
|
@require_login
|
|
def overlay_file_upload(overlay_id: int):
|
|
"""Single-file upload. Multi-file or whole-folder drops fan out client
|
|
side into one POST per file, each carrying its `webkitRelativePath` in
|
|
`relative_path`. Conflicts return 409 unless `overwrite=1`."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
target_folder = (request.form.get("target_path") or "").strip()
|
|
relative_path = (request.form.get("relative_path") or "").strip()
|
|
overwrite = request.form.get("overwrite") == "1"
|
|
upload = request.files.get("file")
|
|
if upload is None:
|
|
return Response("missing file", status=400)
|
|
|
|
# Filename fallback for browsers that don't send relative_path.
|
|
filename = relative_path or (upload.filename or "").strip()
|
|
if not filename:
|
|
return Response("missing filename", status=400)
|
|
|
|
# Normalise: strip any DOS-style path components from the filename.
|
|
filename = filename.replace("\\", "/")
|
|
|
|
full_rel = _join_clean(target_folder, filename)
|
|
if not full_rel:
|
|
return Response("missing destination path", status=400)
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
write_target = safe_resolve_for_write(overlay.path, full_rel)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
|
|
if write_target.exists() and not overwrite:
|
|
return Response("file already exists", status=409)
|
|
|
|
write_target.parent.mkdir(parents=True, exist_ok=True)
|
|
return _stream_upload_into(upload, write_target, full_rel)
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/move")
|
|
@require_login
|
|
def overlay_file_move(overlay_id: int):
|
|
"""Rename / move a file or folder."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
src = (payload.get("src") or "").strip()
|
|
dst = (payload.get("dst") or "").strip()
|
|
overwrite = bool(payload.get("overwrite"))
|
|
if not src or not dst:
|
|
return Response("missing src or dst", status=400)
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
src_path, dst_path = safe_resolve_for_move(overlay.path, src, dst)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
|
|
if dst_path.exists() and not overwrite:
|
|
return Response("destination already exists", status=409)
|
|
|
|
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
if dst_path.exists() and overwrite:
|
|
if dst_path.is_dir() and not dst_path.is_symlink():
|
|
shutil.rmtree(dst_path)
|
|
else:
|
|
os.unlink(dst_path)
|
|
os.rename(src_path, dst_path)
|
|
except OSError as exc:
|
|
return Response(f"move failed: {exc}", status=500)
|
|
|
|
return jsonify({"src": src, "dst": dst})
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/mkdir")
|
|
@require_login
|
|
def overlay_file_mkdir(overlay_id: int):
|
|
"""Create empty directory `path`. Slashes in `path` create intermediates."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
path = (payload.get("path") or "").strip("/").strip()
|
|
if not path:
|
|
return Response("missing path", status=400)
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
target = safe_resolve_for_write(overlay.path, path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
|
|
if target.exists():
|
|
if target.is_dir() and not target.is_symlink():
|
|
# Idempotent — folder already there.
|
|
return jsonify({"path": path})
|
|
return Response("destination already exists and is not a directory", status=409)
|
|
|
|
try:
|
|
target.mkdir(parents=True, exist_ok=False)
|
|
except FileExistsError:
|
|
return Response("directory already exists", status=409)
|
|
except OSError as exc:
|
|
return Response(f"mkdir failed: {exc}", status=500)
|
|
|
|
return jsonify({"path": path})
|
|
|
|
|
|
@bp.post("/overlays/<int:overlay_id>/files/delete")
|
|
@require_login
|
|
def overlay_file_delete(overlay_id: int):
|
|
"""Delete a file or empty folder. Refuses recursive directory removal."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
path = (request.form.get("path") or "").strip()
|
|
if not path:
|
|
return Response("missing path", status=400)
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
target = safe_resolve_for_delete(overlay.path, path)
|
|
except ValueError as exc:
|
|
return Response(str(exc), status=422)
|
|
|
|
if not target.exists() and not target.is_symlink():
|
|
return Response(status=404)
|
|
|
|
try:
|
|
if target.is_dir() and not target.is_symlink():
|
|
try:
|
|
target.rmdir()
|
|
except OSError:
|
|
return Response("directory is not empty", status=409)
|
|
else:
|
|
os.unlink(target)
|
|
except OSError as exc:
|
|
return Response(f"delete failed: {exc}", status=500)
|
|
|
|
return jsonify({"path": path})
|
|
|
|
|
|
@bp.get("/overlays/<int:overlay_id>/files/download_zip")
|
|
@require_login
|
|
def overlay_file_download_zip(overlay_id: int):
|
|
"""Stream a zip of the folder at `path` (or the overlay root). Symlinks
|
|
are written as their resolved file content (matches the regular download
|
|
endpoint's behavior — workshop-cache symlinks streamed as bytes)."""
|
|
user = current_user()
|
|
assert user is not None
|
|
|
|
sub_path = request.args.get("path", "")
|
|
|
|
result = _load_files_overlay(overlay_id, user)
|
|
if isinstance(result, Response):
|
|
return result
|
|
overlay = result
|
|
|
|
try:
|
|
target = safe_resolve_for_listing(overlay.path, sub_path)
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if not target.exists() or not target.is_dir():
|
|
return Response(status=404)
|
|
|
|
folder_name = os.path.basename(str(target)) or "overlay"
|
|
download_name = f"{folder_name}.zip"
|
|
|
|
buffer = io.BytesIO()
|
|
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for root, dirs, files in os.walk(target, followlinks=False):
|
|
for name in files:
|
|
abs_path = os.path.join(root, name)
|
|
rel = os.path.relpath(abs_path, str(target))
|
|
try:
|
|
zf.write(abs_path, arcname=rel)
|
|
except OSError:
|
|
continue
|
|
# Include empty directories so the structure round-trips.
|
|
for name in dirs:
|
|
abs_dir = os.path.join(root, name)
|
|
rel_dir = os.path.relpath(abs_dir, str(target)) + "/"
|
|
if not any(True for _ in os.scandir(abs_dir)):
|
|
zf.writestr(zipfile.ZipInfo(rel_dir), b"")
|
|
|
|
buffer.seek(0)
|
|
return send_file(
|
|
buffer,
|
|
mimetype="application/zip",
|
|
as_attachment=True,
|
|
download_name=download_name,
|
|
)
|
|
|
|
|
|
@bp.get("/servers/<int:server_id>/files/download")
|
|
@require_login
|
|
def server_files_download(server_id: int):
|
|
user = current_user()
|
|
assert user is not None
|
|
sub_path = request.args.get("path", "")
|
|
|
|
with session_scope() as db:
|
|
server = db.scalar(select(Server).where(Server.id == server_id, Server.user_id == user.id))
|
|
if server is None:
|
|
return Response(status=404)
|
|
|
|
try:
|
|
real = safe_resolve_for_server_download(server_id, sub_path)
|
|
except ValueError:
|
|
return Response("invalid path", status=400)
|
|
|
|
if real is None or not real.exists():
|
|
return Response(status=404)
|
|
if real.is_dir():
|
|
return Response("not a file", status=400)
|
|
|
|
return send_file(
|
|
str(real), as_attachment=True, download_name=os.path.basename(real)
|
|
)
|