"""Rate-limit tests for the admin auth flow. Covers: - IP-level SlowAPI limit: 6th POST /admin/login from the same IP within 15 minutes returns 429 + ``rate_limited.html`` body. - Per-email DB limit: 6th POST /admin/login for the same email (even from different IPs — which TestClient can't really simulate without middleware tricks, so we directly insert 5 recent tokens to prime the DB) returns 429. - Every 429 path writes a ``rate_limited`` audit event. """ from __future__ import annotations import importlib from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Iterator import pytest from fastapi.testclient import TestClient from sqlalchemy import text @pytest.fixture def admin_app( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> Iterator[tuple[TestClient, dict]]: """Build a fresh FastAPI app wired to a tmp DB + captured email URL.""" monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path}/admin.db") monkeypatch.setenv("ADMIN_EMAILS", "headhen@example.com") monkeypatch.setenv("APP_ENV", "development") monkeypatch.setenv( "SECRET_KEY", "test-only-secret-key-0123456789abcdef-XYZ" ) monkeypatch.setenv("RESEND_API_KEY", "") monkeypatch.setenv("PUBLIC_BASE_URL", "http://testserver") from app import config as _config _config.get_settings.cache_clear() import app.main as main_module importlib.reload(main_module) app = main_module.app captured: dict = {"urls": []} app.state.email_service.send_magic_link = lambda **kw: captured["urls"].append( # type: ignore[assignment] kw["url"] ) from app.services.rate_limit import limiter limiter.reset() with TestClient(app) as client: yield client, captured _config.get_settings.cache_clear() def test_ip_rate_limit_trips_on_sixth(admin_app) -> None: """Five POSTs succeed; the sixth from the same IP returns 429.""" client, _ = admin_app # Use a non-allowlisted email so we don't bump the DB per-email # limit at the same time — isolates the IP-level limit. for i in range(5): resp = client.post( "/admin/login", data={"email": f"nobody{i}@example.com"} ) assert resp.status_code == 200, (i, resp.text[:200]) resp = client.post("/admin/login", data={"email": "nobody5@example.com"}) assert resp.status_code == 429 assert "Too many attempts" in resp.text # Audit row written. with client.app.state.engine.connect() as conn: rows = conn.execute( text( "SELECT detail FROM auth_events" " WHERE event_type = 'rate_limited'" ) ).mappings().all() assert len(rows) >= 1 assert any("\"scope\": \"ip\"" in str(r["detail"]) for r in rows) def test_per_email_rate_limit_trips_on_sixth( admin_app, monkeypatch: pytest.MonkeyPatch ) -> None: """Priming 5 recent tokens for an email causes the 6th request to 429.""" client, _ = admin_app # Disable the SlowAPI IP limiter temporarily so we can isolate the # DB-side per-email check. We can't call six requests through the # IP limiter within 15 minutes — the IP limit would trip first. from app.services.rate_limit import limiter limiter.enabled = False try: # Seed 5 recent tokens for the allowlisted email. now = datetime.now(timezone.utc) with client.app.state.engine.begin() as conn: for i in range(5): conn.execute( text( "INSERT INTO magic_link_tokens" " (email, token_hash, created_at, expires_at," " request_ip)" " VALUES (:e, :h, :c, :x, :ip)" ), { "e": "headhen@example.com", "h": f"seed-hash-{i}-" + ("x" * 50), "c": now.isoformat(), "x": (now + timedelta(minutes=15)).isoformat(), "ip": "", }, ) # 6th request trips the DB-side limiter. resp = client.post( "/admin/login", data={"email": "headhen@example.com"} ) assert resp.status_code == 429 assert "Too many attempts" in resp.text # rate_limited audit row with scope=email. with client.app.state.engine.connect() as conn: rows = conn.execute( text( "SELECT detail FROM auth_events" " WHERE event_type = 'rate_limited'" ) ).mappings().all() assert any("\"scope\": \"email\"" in str(r["detail"]) for r in rows) finally: limiter.enabled = True limiter.reset()