Exceeding the per-IP attempt cap within the window returns 429. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
260 lines
8 KiB
Python
260 lines
8 KiB
Python
import pytest
|
|
|
|
from l4d2web.app import create_app
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import User
|
|
|
|
|
|
@pytest.fixture
|
|
def app_and_user(tmp_path, monkeypatch):
|
|
db_url = f"sqlite:///{tmp_path/'profile.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("currentpass"))
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
marker = u.password_changed_at.isoformat()
|
|
return app, uid, marker
|
|
|
|
|
|
def _logged_in_client(app, uid, marker):
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
sess["pw_changed_at"] = marker
|
|
sess["csrf_token"] = "test-token"
|
|
return client
|
|
|
|
|
|
def test_profile_requires_login(app_and_user):
|
|
app, _, _ = app_and_user
|
|
response = app.test_client().get("/profile", follow_redirects=False)
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_profile_page_renders(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
response = client.get("/profile")
|
|
assert response.status_code == 200
|
|
body = response.get_data(as_text=True)
|
|
assert "Change password" in body
|
|
assert 'name="current_password"' in body
|
|
assert 'name="new_password"' in body
|
|
assert 'name="confirm_new_password"' in body
|
|
|
|
|
|
def test_base_template_links_username_to_profile(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
response = client.get("/dashboard")
|
|
body = response.get_data(as_text=True)
|
|
assert 'href="/profile"' in body
|
|
assert ">alice<" in body
|
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
def _post_pw(client, **fields):
|
|
payload = {"csrf_token": "test-token", **fields}
|
|
return client.post("/profile/password", data=payload, follow_redirects=False)
|
|
|
|
|
|
def _digest_of(username):
|
|
with session_scope() as db:
|
|
u = db.scalar(select(User).where(User.username == username))
|
|
return u.password_digest
|
|
|
|
|
|
def test_post_password_rejects_missing_csrf(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = client.post(
|
|
"/profile/password",
|
|
data={
|
|
"current_password": "currentpass",
|
|
"new_password": "newpass12",
|
|
"confirm_new_password": "newpass12",
|
|
},
|
|
)
|
|
assert response.status_code == 400
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_requires_login(app_and_user):
|
|
app, _, _ = app_and_user
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["csrf_token"] = "test-token"
|
|
response = client.post(
|
|
"/profile/password",
|
|
data={
|
|
"csrf_token": "test-token",
|
|
"current_password": "x",
|
|
"new_password": "y" * 8,
|
|
"confirm_new_password": "y" * 8,
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_post_password_empty_fields_redirects_with_error(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
|
|
assert response.status_code == 302
|
|
assert "/profile?error=fields_required" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_mismatched_confirm(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass99",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=mismatch" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_too_short(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="short7x",
|
|
confirm_new_password="short7x",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=too_short" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_wrong_current(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="WRONG",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=wrong_current" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_happy_path_rotates_and_keeps_current_session(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before_digest = _digest_of("alice")
|
|
with session_scope() as db:
|
|
before_marker = db.scalar(select(User).where(User.id == uid)).password_changed_at
|
|
|
|
response = _post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/profile?success=1")
|
|
|
|
after_digest = _digest_of("alice")
|
|
assert after_digest != before_digest
|
|
with session_scope() as db:
|
|
u = db.scalar(select(User).where(User.id == uid))
|
|
assert u.password_changed_at > before_marker
|
|
new_marker = u.password_changed_at.isoformat()
|
|
|
|
follow = client.get("/dashboard")
|
|
assert follow.status_code == 200
|
|
|
|
with client.session_transaction() as sess:
|
|
assert sess["pw_changed_at"] == new_marker
|
|
|
|
|
|
def test_other_session_dies_after_password_change(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
primary = _logged_in_client(app, uid, marker)
|
|
other = _logged_in_client(app, uid, marker)
|
|
|
|
pre = other.get("/dashboard")
|
|
assert pre.status_code == 200
|
|
|
|
response = _post_pw(
|
|
primary,
|
|
current_password="currentpass",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
assert response.status_code == 302
|
|
|
|
post = other.get("/dashboard", follow_redirects=False)
|
|
assert post.status_code == 302
|
|
assert "/login" in post.headers["Location"]
|
|
|
|
|
|
def test_new_password_works_for_login(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
_post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
|
|
fresh = app.test_client()
|
|
response = fresh.post(
|
|
"/login",
|
|
data={"username": "alice", "password": "newpass12"},
|
|
)
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/dashboard")
|
|
|
|
|
|
def test_post_password_rate_limited(app_and_user):
|
|
from l4d2web.routes.profile_routes import (
|
|
PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS,
|
|
reset_profile_password_rate_limits,
|
|
)
|
|
reset_profile_password_rate_limits()
|
|
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
|
|
for _ in range(PROFILE_PW_RATE_LIMIT_MAX_ATTEMPTS):
|
|
_post_pw(
|
|
client,
|
|
current_password="WRONG",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
|
|
blocked = _post_pw(
|
|
client,
|
|
current_password="WRONG",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
assert blocked.status_code == 429
|