"""Admin auth routes — magic-link login, consume, landing, logout. Every handler here is deliberately thin: it delegates to :class:`app.services.auth.AuthService` or :class:`app.services.sessions.SessionService` and translates the result into an HTTP response. No SQL, no crypto, no email building. Anti-enumeration contract ------------------------- POST /admin/login always renders the same ``login_sent.html`` template with identical copy, identical status, and identical cookie state, regardless of whether the submitted address is on the allowlist. The only side-effects that differ are: - token row inserted (allowlisted) - email dispatched (allowlisted) - audit row has ``allowlisted=true`` vs ``false`` GET /admin/auth/consume/{token} likewise uses a single failure page for every bad-token reason (missing / unknown / expired / already used). The specific reason lives only in the audit log. """ from __future__ import annotations import re from typing import Optional import structlog from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi.templating import Jinja2Templates from app.dependencies.auth import require_admin from app.dependencies.csrf import require_csrf_form from app.models.entities import User from app.services.auth import AuthService, RateLimitedError from app.services.rate_limit import limiter from app.services.sessions import COOKIE_NAME, SessionService router: APIRouter = APIRouter(tags=["admin"]) _log = structlog.get_logger(__name__) # Lightweight email regex. We intentionally avoid ``email-validator`` # (not pinned in requirements.txt) and a full RFC-5322 parser; the goal # is only to reject empty strings and obviously-not-an-email submissions # before calling the allowlist. Full validation is unnecessary because # non-allowlisted addresses are silently ignored anyway. _EMAIL_RE: re.Pattern[str] = re.compile( r"^[^@\s]+@[^@\s]+\.[^@\s]+$" ) def _get_templates(request: Request) -> Jinja2Templates: """Return the app-scoped :class:`Jinja2Templates`.""" return request.app.state.templates def _get_auth_service(request: Request) -> AuthService: """Return the app-scoped :class:`AuthService`.""" return request.app.state.auth_service def _get_session_service(request: Request) -> SessionService: """Return the app-scoped :class:`SessionService`.""" return request.app.state.session_service def _client_ip(request: Request) -> str: """Best-effort client IP from the request. Starlette already respects ``X-Forwarded-For`` via the proxy-headers middleware Uvicorn installs with ``--proxy-headers``; that means ``request.client.host`` is the real client IP. """ return request.client.host if request.client else "" def _user_agent(request: Request) -> str: """Return the submitted User-Agent header (empty string if missing).""" return request.headers.get("user-agent", "") # --------------------------------------------------------------------------- # GET /admin/login # --------------------------------------------------------------------------- @router.get("/admin/login", response_class=HTMLResponse, summary="Admin login form") def admin_login_form( request: Request, templates: Jinja2Templates = Depends(_get_templates), ) -> HTMLResponse: """Render the email-entry form. The login form is deliberately NOT CSRF-protected: the user is pre-authentication and no session cookie exists yet, so there is no authenticated context for a forged request to hijack. The ``ADMIN_EMAILS`` allowlist and SlowAPI rate limit are what keep this endpoint safe. """ return templates.TemplateResponse( request, "admin/login.html", {"error": None, "email": ""}, ) # --------------------------------------------------------------------------- # POST /admin/login # --------------------------------------------------------------------------- @router.post("/admin/login", response_class=HTMLResponse, summary="Request magic link") @limiter.limit("5/15 minutes") def admin_login_submit( request: Request, email: str = Form(default=""), templates: Jinja2Templates = Depends(_get_templates), auth: AuthService = Depends(_get_auth_service), ) -> Response: """Handle the login form submission. Flow: 1. Normalize + validate the email format. On format error we re-render ``login.html`` with a message (this is a UX concession — an invalid email shape is not a successful submission, so there's no enumeration risk). 2. Call :meth:`AuthService.request_link`. 3. Regardless of allowlist membership: render ``login_sent.html`` with identical copy. On per-email rate-limit: render ``rate_limited.html`` + 429. Rate limiting ------------- The ``@limiter.limit`` decoration is applied dynamically from ``app.main`` so tests can bypass it — see the route registration helper in ``app.main``. """ normalized = (email or "").strip().lower() if not normalized or not _EMAIL_RE.match(normalized): # Format error surfaces as a flash; there's nothing to leak # here because we haven't checked the allowlist yet. return templates.TemplateResponse( request, "admin/login.html", { "error": "Please enter a valid email address.", "email": email or "", }, status_code=400, ) ip = _client_ip(request) ua = _user_agent(request) try: auth.request_link(email=normalized, ip=ip, user_agent=ua) except RateLimitedError: # Per-email DB-side limit tripped. The SlowAPI IP-level limit # is handled separately via the registered exception handler. return templates.TemplateResponse( request, "admin/rate_limited.html", {}, status_code=429, ) return templates.TemplateResponse( request, "admin/login_sent.html", {}, ) # --------------------------------------------------------------------------- # GET /admin/auth/consume/{token} # --------------------------------------------------------------------------- @router.get( "/admin/auth/consume/{token}", summary="Consume magic-link token", ) @limiter.limit("20/15 minutes") def admin_auth_consume( request: Request, token: str, templates: Jinja2Templates = Depends(_get_templates), auth: AuthService = Depends(_get_auth_service), sessions: SessionService = Depends(_get_session_service), ) -> Response: """Consume a magic-link token and, on success, set the session cookie. Failure path returns 400 + generic ``invalid or expired`` page. Success path sets ``cb_session`` and 303-redirects to ``/admin``. """ ip = _client_ip(request) ua = _user_agent(request) result = auth.consume(raw_token=token, ip=ip, user_agent=ua) if result is None: # Generic failure response — the audit log has the real reason. return templates.TemplateResponse( request, "admin/login_failed.html", {}, status_code=400, ) _user, _session, cookie_value = result # 303 forces the browser to GET /admin on the next request. response = RedirectResponse(url="/admin", status_code=303) response.set_cookie(value=cookie_value, **sessions.cookie_params()) return response # The authenticated landing page (``GET /admin``) now lives in # :mod:`app.routes.admin_cms` as the dashboard. This module stays # scoped to pre-auth / auth-lifecycle endpoints. # --------------------------------------------------------------------------- # POST /admin/logout # --------------------------------------------------------------------------- @router.post("/admin/logout", summary="Log out the current admin") def admin_logout( request: Request, user: User = Depends(require_admin), sessions: SessionService = Depends(_get_session_service), _csrf: None = Depends(require_csrf_form), ) -> Response: """Revoke the current session and clear the cookie. Always issues a 303 redirect to ``/admin/login`` so browsers transparently follow and show the login form (with no cookie). """ from app.services.sessions import SessionService as _SS # noqa: F401 # Look up the session again via the cookie so we can revoke it # and emit a properly-correlated audit row. cookie_value: Optional[str] = request.cookies.get(COOKIE_NAME) session = sessions.lookup(cookie_value) audit = request.app.state.audit_service if session is not None: sessions.revoke(session) audit.record( "session_revoked", email=user.email, user_id=user.id, ip=_client_ip(request), user_agent=_user_agent(request), detail={"session_id": session.token_hash[-6:]}, ) response = RedirectResponse(url="/admin/login", status_code=303) # Clear the cookie by setting an empty value with Max-Age=0 and the # same Path so the browser actually removes it. response.delete_cookie(key=COOKIE_NAME, path="/") return response