81 lines
2.4 KiB
Python
81 lines
2.4 KiB
Python
import time
|
|
|
|
from flask import Blueprint, Response, request, redirect
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.auth import hash_password, login_user, logout_user, verify_password
|
|
from l4d2web.db import session_scope
|
|
from l4d2web.models import User
|
|
|
|
|
|
bp = Blueprint("auth", __name__)
|
|
LOGIN_RATE_LIMIT_WINDOW_SECONDS = 60
|
|
LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 20
|
|
LOGIN_ATTEMPTS_BY_IP: dict[str, list[float]] = {}
|
|
|
|
|
|
def reset_login_rate_limits() -> None:
|
|
LOGIN_ATTEMPTS_BY_IP.clear()
|
|
|
|
|
|
def is_login_rate_limited(remote_addr: str) -> bool:
|
|
now = time.time()
|
|
attempts = LOGIN_ATTEMPTS_BY_IP.setdefault(remote_addr, [])
|
|
cutoff = now - LOGIN_RATE_LIMIT_WINDOW_SECONDS
|
|
attempts[:] = [ts for ts in attempts if ts >= cutoff]
|
|
if len(attempts) >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS:
|
|
return True
|
|
attempts.append(now)
|
|
return False
|
|
|
|
|
|
@bp.get("/signup")
|
|
def signup_form() -> Response:
|
|
return Response("signup", mimetype="text/plain")
|
|
|
|
|
|
@bp.post("/signup")
|
|
def signup() -> Response:
|
|
username = request.form.get("username", "").strip()
|
|
password = request.form.get("password", "")
|
|
if not username or not password:
|
|
return Response("missing credentials", status=400)
|
|
|
|
with session_scope() as db:
|
|
existing = db.scalar(select(User).where(User.username == username))
|
|
if existing is not None:
|
|
return Response("username already exists", status=409)
|
|
user = User(username=username, password_digest=hash_password(password), admin=False)
|
|
db.add(user)
|
|
db.flush()
|
|
login_user(user.id)
|
|
|
|
return redirect("/dashboard")
|
|
|
|
|
|
@bp.get("/login")
|
|
def login_form() -> Response:
|
|
return Response("login", mimetype="text/plain")
|
|
|
|
|
|
@bp.post("/login")
|
|
def login() -> Response:
|
|
remote_addr = request.remote_addr or "unknown"
|
|
if is_login_rate_limited(remote_addr):
|
|
return Response("too many login attempts", status=429)
|
|
|
|
username = request.form.get("username", "").strip()
|
|
password = request.form.get("password", "")
|
|
with session_scope() as db:
|
|
user = db.scalar(select(User).where(User.username == username))
|
|
if user is None or not verify_password(password, user.password_digest):
|
|
return Response("invalid credentials", status=401)
|
|
login_user(user.id)
|
|
LOGIN_ATTEMPTS_BY_IP.pop(remote_addr, None)
|
|
return redirect("/dashboard")
|
|
|
|
|
|
@bp.post("/logout")
|
|
def logout() -> Response:
|
|
logout_user()
|
|
return redirect("/login")
|