"""End-to-end HTTP tests for the Phase 5 contact form. Each test builds its own FastAPI app against a fresh temp-file SQLite database and resets the SlowAPI limiter between runs, matching the pattern used by ``test_admin_routes.py`` / ``test_rate_limit.py``. We stub :meth:`HCaptchaService.verify` at the ``app.state`` boundary so hCaptcha decisions are deterministic without network access; the service's own unit tests exercise the real verification path. """ from __future__ import annotations import importlib from pathlib import Path from typing import Iterator import pytest from fastapi.testclient import TestClient from sqlalchemy import text @pytest.fixture def contact_app( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> Iterator[tuple[TestClient, dict]]: """Build a fresh FastAPI app wired to a tmp DB + captured sends.""" monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path}/contact.db") monkeypatch.setenv("ADMIN_EMAILS", "headhen@example.com") monkeypatch.setenv("ADMIN_CONTACT_EMAIL", "head-hen@example.com") monkeypatch.setenv("APP_ENV", "development") monkeypatch.setenv( "SECRET_KEY", "test-only-secret-key-0123456789abcdef-XYZ" ) monkeypatch.setenv("RESEND_API_KEY", "") monkeypatch.setenv("HCAPTCHA_SECRET", "") monkeypatch.setenv("HCAPTCHA_SITE_KEY", "") monkeypatch.setenv("PUBLIC_BASE_URL", "http://testserver") from app import config as _config _config.get_settings.cache_clear() import app.main as main_module importlib.reload(main_module) app = main_module.app captured: dict = {"emails": [], "hcaptcha": []} # Intercept the email dispatch so we can assert it was called # (or NOT called) without talking to Resend. def _capture_email(**kw) -> None: captured["emails"].append(kw) app.state.email_service.send_contact_notification = _capture_email # type: ignore[assignment] # Default: hCaptcha returns True. Individual tests override. async def _hc_ok(token: str, remote_ip: str) -> bool: captured["hcaptcha"].append({"token": token, "ip": remote_ip}) return True app.state.hcaptcha_service.verify = _hc_ok # type: ignore[assignment] # Reset rate limiter between tests (module-level singleton). from app.services.rate_limit import limiter limiter.reset() with TestClient(app) as client: yield client, captured _config.get_settings.cache_clear() def _count(app, table: str, where: str = "1=1") -> int: with app.state.engine.connect() as conn: row = conn.execute( text(f"SELECT COUNT(*) AS c FROM {table} WHERE {where}") ).mappings().first() return int(row["c"]) if row is not None else 0 def _audit_details(app, event_type: str) -> list[str]: with app.state.engine.connect() as conn: rows = conn.execute( text( "SELECT detail FROM auth_events WHERE event_type = :t" " ORDER BY id" ), {"t": event_type}, ).mappings().all() return [str(r["detail"]) for r in rows] def test_get_contact_renders_live_form(contact_app) -> None: """GET /contact shows the live form + honeypot + no disabled attr.""" client, _ = contact_app resp = client.get("/contact") assert resp.status_code == 200 assert "Get in touch" in resp.text assert 'method="POST"' in resp.text assert 'action="/contact"' in resp.text # Honeypot field is present but inside a visually-hidden container. assert 'name="website"' in resp.text assert 'aria-hidden="true"' in resp.text # Dev: no hCaptcha site key → widget not rendered. assert "js.hcaptcha.com" not in resp.text def test_post_contact_happy_path_persists_and_sends(contact_app) -> None: """A valid submission writes a row, calls email, renders success.""" client, captured = contact_app # Use a message whose final words sit past the 40-char preview # cutoff so the full body doesn't leak into the audit detail. long_message = ( "Hello there, I'd like to reserve a ROOSTER named Bernard as soon" " as possible." ) resp = client.post( "/contact", data={ "name": "Ada Lovelace", "email": "ada@example.com", "message": long_message, "website": "", # honeypot empty }, ) assert resp.status_code == 200 assert "Thanks for reaching out" in resp.text # Row persisted. assert _count(client.app, "contact_submissions") == 1 with client.app.state.engine.connect() as conn: row = conn.execute( text("SELECT name, email, message FROM contact_submissions") ).mappings().first() assert row is not None assert row["name"] == "Ada Lovelace" assert row["email"] == "ada@example.com" assert row["message"] == long_message # Email dispatched (intercepted). assert len(captured["emails"]) == 1 assert captured["emails"][0]["to"] == "head-hen@example.com" assert captured["emails"][0]["submission_name"] == "Ada Lovelace" # Audit row emitted. details = _audit_details(client.app, "contact_submitted") assert len(details) == 1 assert "message_length" in details[0] # The full message body must NOT leak in the audit detail — only # the first 40 chars (preview) and the length. assert "Bernard" not in details[0], ( "audit detail leaked past the 40-char preview window" ) # Preview is truncated to 40 chars. assert "message_preview" in details[0] def test_honeypot_tripped_rejected_silently(contact_app) -> None: """A non-empty honeypot short-circuits to the success page, no row.""" client, captured = contact_app resp = client.post( "/contact", data={ "name": "Spammy", "email": "spam@example.com", "message": "This is a valid-looking message.", "website": "http://spam.example.com", }, ) assert resp.status_code == 200 assert "Thanks for reaching out" in resp.text # No DB row, no email. assert _count(client.app, "contact_submissions") == 0 assert captured["emails"] == [] # Audit row captures the rejection reason. details = _audit_details(client.app, "contact_spam_rejected") assert len(details) == 1 assert "\"reason\": \"honeypot\"" in details[0] def test_hcaptcha_fail_rejected_silently(contact_app) -> None: """hCaptcha False also lands on the generic success page silently.""" client, captured = contact_app async def _hc_fail(token: str, remote_ip: str) -> bool: return False client.app.state.hcaptcha_service.verify = _hc_fail # type: ignore[assignment] resp = client.post( "/contact", data={ "name": "Ada", "email": "ada@example.com", "message": "Please get back to me about eggs.", "website": "", }, ) assert resp.status_code == 200 assert "Thanks for reaching out" in resp.text assert _count(client.app, "contact_submissions") == 0 assert captured["emails"] == [] details = _audit_details(client.app, "contact_spam_rejected") assert len(details) == 1 assert "\"reason\": \"hcaptcha\"" in details[0] def test_validation_errors_rerender_form(contact_app) -> None: """Empty / short / malformed fields re-render the form at 400.""" client, _ = contact_app # Empty name + bad email + short message. resp = client.post( "/contact", data={ "name": "", "email": "not-an-email", "message": "hi", "website": "", }, ) assert resp.status_code == 400 # Inline error messages surface. assert "Please enter your name." in resp.text assert "valid email" in resp.text assert "at least 10 characters" in resp.text # The form echoes the submitted values so the user doesn't retype. assert 'value="not-an-email"' in resp.text # Nothing persisted. assert _count(client.app, "contact_submissions") == 0 def test_name_too_long_rejected(contact_app) -> None: """Name > 80 chars is rejected with an inline error.""" client, _ = contact_app resp = client.post( "/contact", data={ "name": "A" * 81, "email": "ada@example.com", "message": "Please do get back to me.", "website": "", }, ) assert resp.status_code == 400 assert "80 characters" in resp.text def test_message_too_long_rejected(contact_app) -> None: """Message > 4000 chars is rejected with an inline error.""" client, _ = contact_app resp = client.post( "/contact", data={ "name": "Ada", "email": "ada@example.com", "message": "x" * 4001, "website": "", }, ) assert resp.status_code == 400 assert "4000 characters" in resp.text def test_rate_limit_trips_on_fourth(contact_app) -> None: """3 submissions/hour per IP; the 4th returns 429.""" client, _ = contact_app data = { "name": "Ada", "email": "ada@example.com", "message": "Please get back to me about eggs.", "website": "", } for i in range(3): resp = client.post("/contact", data=data) assert resp.status_code == 200, (i, resp.text[:200]) resp = client.post("/contact", data=data) assert resp.status_code == 429 assert "Too many attempts" in resp.text def test_email_send_failure_still_returns_success( contact_app, monkeypatch: pytest.MonkeyPatch ) -> None: """If the email dispatch blows up, the user still sees the thank-you page.""" client, _ = contact_app def _boom(**kw) -> None: raise RuntimeError("pretend Resend died") client.app.state.email_service.send_contact_notification = _boom # type: ignore[assignment] resp = client.post( "/contact", data={ "name": "Ada", "email": "ada@example.com", "message": "Please reach out when you have a moment.", "website": "", }, ) assert resp.status_code == 200 assert "Thanks for reaching out" in resp.text # Row still persisted — the user's message is not lost even though # the notification failed. assert _count(client.app, "contact_submissions") == 1