- login_user clears any pre-login session state before stamping user_id/pw_changed_at/admin so a fixated cookie value cannot smuggle data past the login boundary - logout_user now session.clear()s instead of only popping user_id, removing leftover pw_changed_at/admin markers - CSRF token comparison uses hmac.compare_digest - load_current_user rejects sessions where the stamped admin flag no longer matches the user row, preventing a demoted admin from retaining elevated access until next password change (backward-compatible: sessions issued pre-upgrade lack the marker and pass through until next login)
346 lines
12 KiB
Python
346 lines
12 KiB
Python
import pytest
|
|
from sqlalchemy import select
|
|
|
|
from l4d2web.app import create_app
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import User
|
|
|
|
|
|
@pytest.fixture
|
|
def client(tmp_path, monkeypatch):
|
|
db_url = f"sqlite:///{tmp_path/'auth.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
return app.test_client()
|
|
|
|
|
|
@pytest.fixture
|
|
def seed_user(tmp_path, monkeypatch):
|
|
db_url = f"sqlite:///{tmp_path/'seed.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
with app.app_context():
|
|
with session_scope() as session:
|
|
user = User(username="alice", password_digest=hash_password("secret"), admin=False)
|
|
session.add(user)
|
|
session.flush()
|
|
user_id = user.id
|
|
return user_id
|
|
|
|
|
|
def test_login_page_renders_form(client) -> None:
|
|
response = client.get("/login")
|
|
text = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert '<form method="post" action="/login"' in text
|
|
assert 'name="username"' in text
|
|
assert 'name="password"' in text
|
|
assert "signup" not in text.lower()
|
|
|
|
|
|
def test_login_page_drops_unsafe_encoded_next(client) -> None:
|
|
response = client.get("/login?next=/%5Cevil.com")
|
|
text = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert 'name="next"' not in text
|
|
assert "evil.com" not in text
|
|
|
|
|
|
def test_login_calls_verify_password_for_unknown_user(client, monkeypatch) -> None:
|
|
calls: list[tuple[str, str]] = []
|
|
|
|
def tracking_verify(raw: str, digest: str) -> bool:
|
|
calls.append((raw, digest))
|
|
return False
|
|
|
|
monkeypatch.setattr("l4d2web.routes.auth_routes.verify_password", tracking_verify)
|
|
|
|
response = client.post("/login", data={"username": "ghost", "password": "guess"})
|
|
|
|
assert response.status_code == 401
|
|
assert len(calls) == 1, "verify_password must run on unknown users to equalize timing"
|
|
assert calls[0][0] == "guess"
|
|
|
|
|
|
def test_signup_routes_are_gone(client) -> None:
|
|
assert client.get("/signup").status_code == 404
|
|
assert client.post("/signup", data={"username": "alice", "password": "secret"}).status_code == 404
|
|
|
|
|
|
def test_login_redirects_to_safe_next(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
|
|
|
response = client.post(
|
|
"/login",
|
|
data={"username": "alice", "password": "secret", "next": "/servers"},
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/servers")
|
|
|
|
|
|
def test_login_ignores_unsafe_next(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
|
|
|
response = client.post(
|
|
"/login",
|
|
data={"username": "alice", "password": "secret", "next": "https://example.com/steal"},
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/dashboard")
|
|
|
|
|
|
def test_login_ignores_backslash_next(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
|
|
|
response = client.post(
|
|
"/login",
|
|
data={"username": "alice", "password": "secret", "next": "/\\evil.com"},
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/dashboard")
|
|
|
|
|
|
def test_login_ignores_percent_encoded_backslash_next(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
|
|
|
response = client.post(
|
|
"/login",
|
|
data={"username": "alice", "password": "secret", "next": "/%5Cevil.com"},
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/dashboard")
|
|
|
|
|
|
def test_login_sets_session(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=False))
|
|
|
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
assert sess.get("user_id") is not None
|
|
|
|
|
|
def test_load_current_user_rejects_missing_marker(client) -> None:
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("secret"))
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
|
|
response = client.get("/dashboard")
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_load_current_user_rejects_stale_marker(client) -> None:
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("secret"))
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
|
|
stale = datetime.now(UTC).replace(tzinfo=None) - timedelta(minutes=5)
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
sess["pw_changed_at"] = stale.isoformat()
|
|
|
|
response = client.get("/dashboard")
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_load_current_user_accepts_current_marker(client) -> None:
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("secret"))
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
marker = u.password_changed_at.isoformat()
|
|
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
sess["pw_changed_at"] = marker
|
|
|
|
response = client.get("/dashboard")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_login_stamps_password_changed_at_in_session(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret")))
|
|
|
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
marker = sess.get("pw_changed_at")
|
|
assert marker is not None
|
|
with session_scope() as session:
|
|
user = session.query(User).filter_by(username="alice").one()
|
|
assert marker == user.password_changed_at.isoformat()
|
|
|
|
|
|
def test_login_clears_pre_login_session_state(client) -> None:
|
|
"""Pre-login session keys must not survive into the authenticated session,
|
|
so a fixated cookie value cannot smuggle state past the login boundary."""
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret")))
|
|
|
|
with client.session_transaction() as sess:
|
|
sess["pre_login_marker"] = "smuggled"
|
|
|
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
assert sess.get("user_id") is not None
|
|
assert "pre_login_marker" not in sess
|
|
|
|
|
|
def test_logout_clears_session_completely(client) -> None:
|
|
"""logout_user must drop pw_changed_at alongside user_id so no auth
|
|
markers linger after sign-out."""
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret")))
|
|
|
|
client.post("/login", data={"username": "alice", "password": "secret"})
|
|
with client.session_transaction() as sess:
|
|
assert sess.get("user_id") is not None
|
|
assert sess.get("pw_changed_at") is not None
|
|
sess["csrf_token"] = "test-token"
|
|
|
|
client.post("/logout", data={"csrf_token": "test-token"})
|
|
|
|
with client.session_transaction() as sess:
|
|
assert "user_id" not in sess
|
|
assert "pw_changed_at" not in sess
|
|
|
|
|
|
def test_load_current_user_rejects_role_change(client) -> None:
|
|
"""If a user's admin flag changes after login, the existing session must
|
|
be rejected on the next request — preventing demoted admins from
|
|
retaining elevated access until their next password change."""
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("secret"), admin=True)
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
marker = u.password_changed_at.isoformat()
|
|
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
sess["pw_changed_at"] = marker
|
|
sess["admin"] = True
|
|
|
|
# Demote outside the session.
|
|
with session_scope() as db:
|
|
target = db.scalar(select(User).where(User.id == uid))
|
|
target.admin = False
|
|
|
|
response = client.get("/dashboard")
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_login_stamps_admin_flag_in_session(client) -> None:
|
|
with session_scope() as session:
|
|
session.add(User(username="alice", password_digest=hash_password("secret"), admin=True))
|
|
|
|
response = client.post("/login", data={"username": "alice", "password": "secret"})
|
|
assert response.status_code == 302
|
|
|
|
with client.session_transaction() as sess:
|
|
assert sess.get("admin") is True
|
|
|
|
|
|
def test_create_user_cli_uses_environment_password(tmp_path, monkeypatch) -> None:
|
|
db_url = f"sqlite:///{tmp_path/'create_user.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
|
|
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
|
|
|
assert result.exit_code == 0
|
|
assert "created user admin" in result.output
|
|
with session_scope() as session:
|
|
user = session.query(User).filter_by(username="admin").one()
|
|
assert user.admin is True
|
|
|
|
|
|
def test_create_user_cli_rejects_short_password(tmp_path, monkeypatch) -> None:
|
|
db_url = f"sqlite:///{tmp_path/'short_pw.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "short7x")
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
|
|
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
|
|
|
assert result.exit_code != 0
|
|
assert "at least 8" in result.output
|
|
|
|
|
|
def test_create_user_cli_rejects_empty_environment_password(tmp_path, monkeypatch) -> None:
|
|
db_url = f"sqlite:///{tmp_path/'empty_password.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "")
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
|
|
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
|
|
|
assert result.exit_code != 0
|
|
assert "password must not be empty" in result.output
|
|
|
|
|
|
def test_validate_new_password_rejects_empty():
|
|
from l4d2web.auth import validate_new_password
|
|
assert validate_new_password("") == "password must not be empty"
|
|
|
|
|
|
def test_validate_new_password_rejects_short():
|
|
from l4d2web.auth import MIN_PASSWORD_LENGTH, validate_new_password
|
|
assert MIN_PASSWORD_LENGTH == 8
|
|
assert validate_new_password("a" * 7) == "password must be at least 8 characters"
|
|
|
|
|
|
def test_validate_new_password_accepts_min_length():
|
|
from l4d2web.auth import validate_new_password
|
|
assert validate_new_password("a" * 8) is None
|
|
|
|
|
|
def test_create_user_cli_rejects_duplicate_username(tmp_path, monkeypatch) -> None:
|
|
db_url = f"sqlite:///{tmp_path/'duplicate_user.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
monkeypatch.setenv("LEFT4ME_ADMIN_PASSWORD", "secretpw1")
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
with session_scope() as session:
|
|
session.add(User(username="admin", password_digest=hash_password("secret"), admin=True))
|
|
|
|
result = app.test_cli_runner().invoke(args=["create-user", "admin", "--admin"])
|
|
|
|
assert result.exit_code != 0
|
|
assert "user already exists" in result.output
|