feat(l4d2-web): add public auth and admin bootstrap command
This commit is contained in:
parent
4e9c0172ef
commit
a516402163
6 changed files with 202 additions and 0 deletions
|
|
@ -1,6 +1,12 @@
|
||||||
|
import os
|
||||||
|
|
||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
|
|
||||||
|
from l4d2web.auth import load_current_user
|
||||||
|
from l4d2web.cli import register_cli
|
||||||
from l4d2web.config import DEFAULT_CONFIG
|
from l4d2web.config import DEFAULT_CONFIG
|
||||||
|
from l4d2web.db import init_db
|
||||||
|
from l4d2web.routes.auth_routes import bp as auth_bp
|
||||||
|
|
||||||
|
|
||||||
def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
|
|
@ -9,8 +15,19 @@ def create_app(test_config: dict[str, object] | None = None) -> Flask:
|
||||||
if test_config is not None:
|
if test_config is not None:
|
||||||
app.config.update(test_config)
|
app.config.update(test_config)
|
||||||
|
|
||||||
|
os.environ["DATABASE_URL"] = str(app.config["DATABASE_URL"])
|
||||||
|
init_db()
|
||||||
|
|
||||||
|
app.before_request(load_current_user)
|
||||||
|
app.register_blueprint(auth_bp)
|
||||||
|
register_cli(app)
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
def health():
|
def health():
|
||||||
return jsonify({"status": "ok"})
|
return jsonify({"status": "ok"})
|
||||||
|
|
||||||
|
@app.get("/")
|
||||||
|
def root():
|
||||||
|
return jsonify({"status": "ok"})
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
|
||||||
64
components/l4d2-web-app/src/l4d2web/auth.py
Normal file
64
components/l4d2-web-app/src/l4d2web/auth.py
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
from functools import wraps
|
||||||
|
from typing import Callable, TypeVar
|
||||||
|
|
||||||
|
from flask import abort, g, redirect, session
|
||||||
|
from sqlalchemy import select
|
||||||
|
from werkzeug.security import check_password_hash, generate_password_hash
|
||||||
|
|
||||||
|
from l4d2web.db import session_scope
|
||||||
|
from l4d2web.models import User
|
||||||
|
|
||||||
|
|
||||||
|
F = TypeVar("F", bound=Callable)
|
||||||
|
|
||||||
|
|
||||||
|
def hash_password(raw: str) -> str:
|
||||||
|
return generate_password_hash(raw)
|
||||||
|
|
||||||
|
|
||||||
|
def verify_password(raw: str, digest: str) -> bool:
|
||||||
|
return check_password_hash(digest, raw)
|
||||||
|
|
||||||
|
|
||||||
|
def load_current_user() -> None:
|
||||||
|
user_id = session.get("user_id")
|
||||||
|
if user_id is None:
|
||||||
|
g.user = None
|
||||||
|
return
|
||||||
|
with session_scope() as db:
|
||||||
|
g.user = db.scalar(select(User).where(User.id == int(user_id)))
|
||||||
|
|
||||||
|
|
||||||
|
def current_user() -> User | None:
|
||||||
|
return getattr(g, "user", None)
|
||||||
|
|
||||||
|
|
||||||
|
def login_user(user_id: int) -> None:
|
||||||
|
session["user_id"] = user_id
|
||||||
|
|
||||||
|
|
||||||
|
def logout_user() -> None:
|
||||||
|
session.pop("user_id", None)
|
||||||
|
|
||||||
|
|
||||||
|
def require_login(func: F) -> F:
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
if current_user() is None:
|
||||||
|
return redirect("/login")
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper # type: ignore[return-value]
|
||||||
|
|
||||||
|
|
||||||
|
def require_admin(func: F) -> F:
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
user = current_user()
|
||||||
|
if user is None:
|
||||||
|
return redirect("/login")
|
||||||
|
if not user.admin:
|
||||||
|
abort(403)
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper # type: ignore[return-value]
|
||||||
19
components/l4d2-web-app/src/l4d2web/cli.py
Normal file
19
components/l4d2-web-app/src/l4d2web/cli.py
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
import click
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from l4d2web.db import session_scope
|
||||||
|
from l4d2web.models import User
|
||||||
|
|
||||||
|
|
||||||
|
@click.command("promote-admin")
|
||||||
|
@click.argument("username")
|
||||||
|
def promote_admin(username: str) -> None:
|
||||||
|
with session_scope() as db:
|
||||||
|
user = db.scalar(select(User).where(User.username == username))
|
||||||
|
if user is None:
|
||||||
|
raise click.ClickException("user not found")
|
||||||
|
user.admin = True
|
||||||
|
|
||||||
|
|
||||||
|
def register_cli(app) -> None:
|
||||||
|
app.cli.add_command(promote_admin)
|
||||||
0
components/l4d2-web-app/src/l4d2web/routes/__init__.py
Normal file
0
components/l4d2-web-app/src/l4d2web/routes/__init__.py
Normal file
56
components/l4d2-web-app/src/l4d2web/routes/auth_routes.py
Normal file
56
components/l4d2-web-app/src/l4d2web/routes/auth_routes.py
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
from flask import Blueprint, Response, request, redirect
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from l4d2web.auth import hash_password, login_user, logout_user, verify_password
|
||||||
|
from l4d2web.db import session_scope
|
||||||
|
from l4d2web.models import User
|
||||||
|
|
||||||
|
|
||||||
|
bp = Blueprint("auth", __name__)
|
||||||
|
|
||||||
|
|
||||||
|
@bp.get("/signup")
|
||||||
|
def signup_form() -> Response:
|
||||||
|
return Response("signup", mimetype="text/plain")
|
||||||
|
|
||||||
|
|
||||||
|
@bp.post("/signup")
|
||||||
|
def signup() -> Response:
|
||||||
|
username = request.form.get("username", "").strip()
|
||||||
|
password = request.form.get("password", "")
|
||||||
|
if not username or not password:
|
||||||
|
return Response("missing credentials", status=400)
|
||||||
|
|
||||||
|
with session_scope() as db:
|
||||||
|
existing = db.scalar(select(User).where(User.username == username))
|
||||||
|
if existing is not None:
|
||||||
|
return Response("username already exists", status=409)
|
||||||
|
user = User(username=username, password_digest=hash_password(password), admin=False)
|
||||||
|
db.add(user)
|
||||||
|
db.flush()
|
||||||
|
login_user(user.id)
|
||||||
|
|
||||||
|
return redirect("/dashboard")
|
||||||
|
|
||||||
|
|
||||||
|
@bp.get("/login")
|
||||||
|
def login_form() -> Response:
|
||||||
|
return Response("login", mimetype="text/plain")
|
||||||
|
|
||||||
|
|
||||||
|
@bp.post("/login")
|
||||||
|
def login() -> Response:
|
||||||
|
username = request.form.get("username", "").strip()
|
||||||
|
password = request.form.get("password", "")
|
||||||
|
with session_scope() as db:
|
||||||
|
user = db.scalar(select(User).where(User.username == username))
|
||||||
|
if user is None or not verify_password(password, user.password_digest):
|
||||||
|
return Response("invalid credentials", status=401)
|
||||||
|
login_user(user.id)
|
||||||
|
return redirect("/dashboard")
|
||||||
|
|
||||||
|
|
||||||
|
@bp.post("/logout")
|
||||||
|
def logout() -> Response:
|
||||||
|
logout_user()
|
||||||
|
return redirect("/login")
|
||||||
46
components/l4d2-web-app/tests/test_auth.py
Normal file
46
components/l4d2-web-app/tests/test_auth.py
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from l4d2web.app import create_app
|
||||||
|
from l4d2web.auth import hash_password
|
||||||
|
from l4d2web.db import init_db, session_scope
|
||||||
|
from l4d2web.models import User
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client(tmp_path, monkeypatch):
|
||||||
|
db_url = f"sqlite:///{tmp_path/'auth.db'}"
|
||||||
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
init_db()
|
||||||
|
return app.test_client()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def seed_user(tmp_path, monkeypatch):
|
||||||
|
db_url = f"sqlite:///{tmp_path/'seed.db'}"
|
||||||
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
||||||
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
||||||
|
init_db()
|
||||||
|
with app.app_context():
|
||||||
|
with session_scope() as session:
|
||||||
|
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
||||||
|
session.add(user)
|
||||||
|
session.flush()
|
||||||
|
user_id = user.id
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_public_signup(client) -> None:
|
||||||
|
response = client.post("/signup", data={"username": "alice", "password": "secret"})
|
||||||
|
assert response.status_code == 302
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_sets_session(client) -> None:
|
||||||
|
with session_scope() as session:
|
||||||
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
||||||
|
|
||||||
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
||||||
|
assert response.status_code == 302
|
||||||
|
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
assert sess.get("user_id") is not None
|
||||||
Loading…
Reference in a new issue