diff --git a/components/l4d2-web-app/src/l4d2web/app.py b/components/l4d2-web-app/src/l4d2web/app.py index 5f955d2..ea1417e 100644 --- a/components/l4d2-web-app/src/l4d2web/app.py +++ b/components/l4d2-web-app/src/l4d2web/app.py @@ -1,6 +1,12 @@ +import os + from flask import Flask, jsonify +from l4d2web.auth import load_current_user +from l4d2web.cli import register_cli from l4d2web.config import DEFAULT_CONFIG +from l4d2web.db import init_db +from l4d2web.routes.auth_routes import bp as auth_bp def create_app(test_config: dict[str, object] | None = None) -> Flask: @@ -9,8 +15,19 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask: if test_config is not None: app.config.update(test_config) + os.environ["DATABASE_URL"] = str(app.config["DATABASE_URL"]) + init_db() + + app.before_request(load_current_user) + app.register_blueprint(auth_bp) + register_cli(app) + @app.get("/health") def health(): return jsonify({"status": "ok"}) + @app.get("/") + def root(): + return jsonify({"status": "ok"}) + return app diff --git a/components/l4d2-web-app/src/l4d2web/auth.py b/components/l4d2-web-app/src/l4d2web/auth.py new file mode 100644 index 0000000..88ecc46 --- /dev/null +++ b/components/l4d2-web-app/src/l4d2web/auth.py @@ -0,0 +1,64 @@ +from functools import wraps +from typing import Callable, TypeVar + +from flask import abort, g, redirect, session +from sqlalchemy import select +from werkzeug.security import check_password_hash, generate_password_hash + +from l4d2web.db import session_scope +from l4d2web.models import User + + +F = TypeVar("F", bound=Callable) + + +def hash_password(raw: str) -> str: + return generate_password_hash(raw) + + +def verify_password(raw: str, digest: str) -> bool: + return check_password_hash(digest, raw) + + +def load_current_user() -> None: + user_id = session.get("user_id") + if user_id is None: + g.user = None + return + with session_scope() as db: + g.user = db.scalar(select(User).where(User.id == int(user_id))) + + +def current_user() -> User | None: + return getattr(g, "user", None) + + +def login_user(user_id: int) -> None: + session["user_id"] = user_id + + +def logout_user() -> None: + session.pop("user_id", None) + + +def require_login(func: F) -> F: + @wraps(func) + def wrapper(*args, **kwargs): + if current_user() is None: + return redirect("/login") + return func(*args, **kwargs) + + return wrapper # type: ignore[return-value] + + +def require_admin(func: F) -> F: + @wraps(func) + def wrapper(*args, **kwargs): + user = current_user() + if user is None: + return redirect("/login") + if not user.admin: + abort(403) + return func(*args, **kwargs) + + return wrapper # type: ignore[return-value] diff --git a/components/l4d2-web-app/src/l4d2web/cli.py b/components/l4d2-web-app/src/l4d2web/cli.py new file mode 100644 index 0000000..bc3494a --- /dev/null +++ b/components/l4d2-web-app/src/l4d2web/cli.py @@ -0,0 +1,19 @@ +import click +from sqlalchemy import select + +from l4d2web.db import session_scope +from l4d2web.models import User + + +@click.command("promote-admin") +@click.argument("username") +def promote_admin(username: str) -> None: + with session_scope() as db: + user = db.scalar(select(User).where(User.username == username)) + if user is None: + raise click.ClickException("user not found") + user.admin = True + + +def register_cli(app) -> None: + app.cli.add_command(promote_admin) diff --git a/components/l4d2-web-app/src/l4d2web/routes/__init__.py b/components/l4d2-web-app/src/l4d2web/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py b/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py new file mode 100644 index 0000000..19cbf80 --- /dev/null +++ b/components/l4d2-web-app/src/l4d2web/routes/auth_routes.py @@ -0,0 +1,56 @@ +from flask import Blueprint, Response, request, redirect +from sqlalchemy import select + +from l4d2web.auth import hash_password, login_user, logout_user, verify_password +from l4d2web.db import session_scope +from l4d2web.models import User + + +bp = Blueprint("auth", __name__) + + +@bp.get("/signup") +def signup_form() -> Response: + return Response("signup", mimetype="text/plain") + + +@bp.post("/signup") +def signup() -> Response: + username = request.form.get("username", "").strip() + password = request.form.get("password", "") + if not username or not password: + return Response("missing credentials", status=400) + + with session_scope() as db: + existing = db.scalar(select(User).where(User.username == username)) + if existing is not None: + return Response("username already exists", status=409) + user = User(username=username, password_digest=hash_password(password), admin=False) + db.add(user) + db.flush() + login_user(user.id) + + return redirect("/dashboard") + + +@bp.get("/login") +def login_form() -> Response: + return Response("login", mimetype="text/plain") + + +@bp.post("/login") +def login() -> Response: + username = request.form.get("username", "").strip() + password = request.form.get("password", "") + with session_scope() as db: + user = db.scalar(select(User).where(User.username == username)) + if user is None or not verify_password(password, user.password_digest): + return Response("invalid credentials", status=401) + login_user(user.id) + return redirect("/dashboard") + + +@bp.post("/logout") +def logout() -> Response: + logout_user() + return redirect("/login") diff --git a/components/l4d2-web-app/tests/test_auth.py b/components/l4d2-web-app/tests/test_auth.py new file mode 100644 index 0000000..3408c4a --- /dev/null +++ b/components/l4d2-web-app/tests/test_auth.py @@ -0,0 +1,46 @@ +import pytest + +from l4d2web.app import create_app +from l4d2web.auth import hash_password +from l4d2web.db import init_db, session_scope +from l4d2web.models import User + + +@pytest.fixture +def client(tmp_path, monkeypatch): + db_url = f"sqlite:///{tmp_path/'auth.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + return app.test_client() + + +@pytest.fixture +def seed_user(tmp_path, monkeypatch): + db_url = f"sqlite:///{tmp_path/'seed.db'}" + monkeypatch.setenv("DATABASE_URL", db_url) + app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"}) + init_db() + with app.app_context(): + with session_scope() as session: + user = User(username="alice", password_digest=hash_password("secret"), admin=False) + session.add(user) + session.flush() + user_id = user.id + return user_id + + +def test_public_signup(client) -> None: + response = client.post("/signup", data={"username": "alice", "password": "secret"}) + assert response.status_code == 302 + + +def test_login_sets_session(client) -> None: + with session_scope() as session: + session.add(User(username="alice", password_digest=hash_password("secret"), admin=False)) + + response = client.post("/login", data={"username": "alice", "password": "secret"}) + assert response.status_code == 302 + + with client.session_transaction() as sess: + assert sess.get("user_id") is not None