Implements the change-password endpoint: - Per-IP rate limit reusing services/rate_limit - Required fields, mismatched-confirm, policy, wrong-current branches each redirect with a specific ?error= key - Rotates digest + password_changed_at, then re-stamps the current session marker so this browser stays logged in while other sessions get rejected by load_current_user Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
162 lines
5.1 KiB
Python
162 lines
5.1 KiB
Python
import pytest
|
|
|
|
from l4d2web.app import create_app
|
|
from l4d2web.auth import hash_password
|
|
from l4d2web.db import init_db, session_scope
|
|
from l4d2web.models import User
|
|
|
|
|
|
@pytest.fixture
|
|
def app_and_user(tmp_path, monkeypatch):
|
|
db_url = f"sqlite:///{tmp_path/'profile.db'}"
|
|
monkeypatch.setenv("DATABASE_URL", db_url)
|
|
app = create_app({"TESTING": True, "DATABASE_URL": db_url, "SECRET_KEY": "test"})
|
|
init_db()
|
|
with session_scope() as db:
|
|
u = User(username="alice", password_digest=hash_password("currentpass"))
|
|
db.add(u)
|
|
db.flush()
|
|
uid = u.id
|
|
marker = u.password_changed_at.isoformat()
|
|
return app, uid, marker
|
|
|
|
|
|
def _logged_in_client(app, uid, marker):
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["user_id"] = uid
|
|
sess["pw_changed_at"] = marker
|
|
sess["csrf_token"] = "test-token"
|
|
return client
|
|
|
|
|
|
def test_profile_requires_login(app_and_user):
|
|
app, _, _ = app_and_user
|
|
response = app.test_client().get("/profile", follow_redirects=False)
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_profile_page_renders(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
response = client.get("/profile")
|
|
assert response.status_code == 200
|
|
body = response.get_data(as_text=True)
|
|
assert "Change password" in body
|
|
assert 'name="current_password"' in body
|
|
assert 'name="new_password"' in body
|
|
assert 'name="confirm_new_password"' in body
|
|
|
|
|
|
def test_base_template_links_username_to_profile(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
response = client.get("/dashboard")
|
|
body = response.get_data(as_text=True)
|
|
assert 'href="/profile"' in body
|
|
assert ">alice<" in body
|
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
|
def _post_pw(client, **fields):
|
|
payload = {"csrf_token": "test-token", **fields}
|
|
return client.post("/profile/password", data=payload, follow_redirects=False)
|
|
|
|
|
|
def _digest_of(username):
|
|
with session_scope() as db:
|
|
u = db.scalar(select(User).where(User.username == username))
|
|
return u.password_digest
|
|
|
|
|
|
def test_post_password_rejects_missing_csrf(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = client.post(
|
|
"/profile/password",
|
|
data={
|
|
"current_password": "currentpass",
|
|
"new_password": "newpass12",
|
|
"confirm_new_password": "newpass12",
|
|
},
|
|
)
|
|
assert response.status_code == 400
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_requires_login(app_and_user):
|
|
app, _, _ = app_and_user
|
|
client = app.test_client()
|
|
with client.session_transaction() as sess:
|
|
sess["csrf_token"] = "test-token"
|
|
response = client.post(
|
|
"/profile/password",
|
|
data={
|
|
"csrf_token": "test-token",
|
|
"current_password": "x",
|
|
"new_password": "y" * 8,
|
|
"confirm_new_password": "y" * 8,
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/login" in response.headers["Location"]
|
|
|
|
|
|
def test_post_password_empty_fields_redirects_with_error(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(client, current_password="", new_password="", confirm_new_password="")
|
|
assert response.status_code == 302
|
|
assert "/profile?error=fields_required" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_mismatched_confirm(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass99",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=mismatch" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_too_short(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="currentpass",
|
|
new_password="short7x",
|
|
confirm_new_password="short7x",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=too_short" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|
|
|
|
|
|
def test_post_password_wrong_current(app_and_user):
|
|
app, uid, marker = app_and_user
|
|
client = _logged_in_client(app, uid, marker)
|
|
before = _digest_of("alice")
|
|
response = _post_pw(
|
|
client,
|
|
current_password="WRONG",
|
|
new_password="newpass12",
|
|
confirm_new_password="newpass12",
|
|
)
|
|
assert response.status_code == 302
|
|
assert "/profile?error=wrong_current" in response.headers["Location"]
|
|
assert _digest_of("alice") == before
|