feat: phase 3 admin magic-link auth — tokens, sessions, rate limits, audit

End-to-end passwordless admin auth. /admin/login accepts an email, POSTs
mint a 256-bit magic-link token stored only as SHA-256 in
magic_link_tokens (15-min TTL, single-use via atomic rowcount UPDATE).
Resend delivers the link; in dev with no API key, EmailService logs a
structured magic_link_dev_fallback event with the URL so the flow works
offline. /admin/auth/consume/{token} verifies, upserts a users row
(display_name from email local-part), creates a sessions row, and drops
an itsdangerous-signed cb_session cookie (HttpOnly, SameSite=Lax, Secure
in prod). /admin renders a placeholder "Welcome, <name>" page pending
Phase 4 CMS. /admin/logout flips revoked_at rather than deleting the row
to preserve the audit trail.

Rate limits use SlowAPI's in-memory limiter (5/15min/IP on login,
20/15min/IP on consume) plus a DB per-email count to catch
IP-rotating abuse. ADMIN_EMAILS enforces allowlist; non-allowlisted
submissions return the same "check your inbox" page with no token
inserted and no email sent (anti-enumeration). Every event lands in
auth_events via AuditService: link_requested, link_consumed,
consume_failed, session_created, session_revoked, rate_limited.

Add a production config validator refusing empty RESEND_API_KEY,
RESEND_FROM, or ADMIN_EMAILS; add PUBLIC_BASE_URL for email link
construction. CSRF deferred to Phase 6 per roadmap scoping; logout
handler marked # TODO(phase-6-csrf).

Mark Phase 3 complete in docs/ROADMAP.md.
This commit is contained in:
2026-04-21 16:20:51 -05:00
parent 4b088e5045
commit 59dea99079
25 changed files with 2673 additions and 15 deletions

119
app/services/audit.py Normal file
View File

@@ -0,0 +1,119 @@
"""Append-only auth audit log service.
Writes one row per auth event into the ``auth_events`` table. The rest of
the auth stack calls :meth:`AuditService.record` to persist a structured,
queryable audit trail without having to know the SQL or the row schema.
Security notes
--------------
- NEVER pass raw tokens, raw session IDs, or email bodies into ``detail``.
Correlate sessions via the last 6 hex chars of their stored hash, never
the full hash and never the raw value (CWE-200).
- ``detail`` is persisted as JSON text; the schema column is ``TEXT NOT
NULL DEFAULT '{}'`` and the writer always provides a valid JSON object.
- All writes go through parameterized SQL with ``sqlalchemy.text``
``:bind`` parameters; no string interpolation (CWE-89).
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Mapping, Optional
import structlog
from sqlalchemy import Engine, text
_log = structlog.get_logger(__name__)
class AuditService:
"""Persist rows into ``auth_events``.
The service is intentionally tiny: one write method plus a helper
fetcher used by tests. No caching (this is an append-only audit
log and reads are rare).
"""
def __init__(self, engine: Engine) -> None:
"""Store the shared SQLAlchemy engine by reference.
The service never opens its own engine — it reuses the one
wired on ``app.state.engine``.
"""
self._engine: Engine = engine
def record(
self,
event_type: str,
*,
email: Optional[str] = None,
user_id: Optional[int] = None,
ip: str = "",
user_agent: str = "",
detail: Optional[Mapping[str, Any]] = None,
) -> None:
"""Insert a single audit row.
Parameters
----------
event_type:
One of the Phase 3 event types: ``link_requested``,
``link_consumed``, ``consume_failed``, ``session_created``,
``session_revoked``, ``rate_limited``.
email:
Submitted / target email (nullable when the event doesn't
have one, e.g. session_revoked where we key off user_id).
user_id:
Foreign key into ``users``; nullable for pre-auth events.
ip:
Client IP at time of event. Always captured when available;
empty string is acceptable for events originating outside
a request context (which Phase 3 does not currently emit,
but the column is NOT NULL and we want the door closed).
user_agent:
Client UA at time of event. Same NOT-NULL rationale as ``ip``.
detail:
Event-specific structured context (dict-like). Serialized
to a compact JSON string. Defaults to ``{}`` when absent.
NEVER put a raw token or session ID here — only hashes (or
their last 6 chars) and other non-sensitive metadata.
"""
# Always serialize to JSON text; the DB column enforces NOT NULL
# with an empty-object default, and we honor that contract here
# rather than relying on the default.
detail_json = json.dumps(dict(detail) if detail is not None else {})
now_iso = datetime.now(timezone.utc).isoformat()
with self._engine.begin() as conn:
conn.execute(
text(
"INSERT INTO auth_events"
" (event_type, email, user_id, ip, user_agent,"
" created_at, detail)"
" VALUES (:event_type, :email, :user_id, :ip,"
" :user_agent, :created_at, :detail)"
),
{
"event_type": event_type,
"email": email,
"user_id": user_id,
"ip": ip or "",
"user_agent": user_agent or "",
"created_at": now_iso,
"detail": detail_json,
},
)
# Mirror the audit row to structured logs at INFO. We never log
# the raw token / session ID, only the same detail dict (which
# the caller already scrubbed) and the non-sensitive envelope.
_log.info(
"auth_event",
event_type=event_type,
email=email,
user_id=user_id,
detail=detail or {},
)

399
app/services/auth.py Normal file
View File

@@ -0,0 +1,399 @@
"""Magic-link auth orchestration.
Glues token issuance / consumption, user auto-upsert on consume,
email delivery, session creation, and audit logging together behind
two methods:
- :meth:`AuthService.request_link` — handle POST /admin/login
- :meth:`AuthService.consume` — handle GET /admin/auth/consume/{token}
Security decisions are concentrated here:
- Raw tokens live only in memory, the outbound email URL, and the
single SHA-256 hash that ends up in the DB.
- The allowlist check is ALWAYS performed with lowercased emails.
- Non-allowlisted requests receive an identical response shape (handled
by the caller; this service just short-circuits the token issue and
still audits via ``link_requested`` with ``allowlisted=false``).
- Rate limiting has two layers:
1. SlowAPI IP-level decorator on the route (outside this module).
2. DB-side per-email COUNT inside :meth:`request_link` — returns a
sentinel that the route converts into an HTTP 429.
- Consume is atomic: an ``UPDATE ... WHERE token_hash=? AND used_at IS
NULL AND expires_at > ?`` with a ``rowcount == 1`` guard.
"""
from __future__ import annotations
import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
import structlog
from sqlalchemy import Engine, text
from app.config import Settings
from app.models.entities import Session, User
from app.models.mappers import row_to_user
from app.services.audit import AuditService
from app.services.email import EmailService
from app.services.sessions import SessionService
_log = structlog.get_logger(__name__)
# Per-email rate limit: at most 5 tokens issued in a 15-minute window.
# This is the DB-backed layer that survives process restarts; the
# SlowAPI IP-level decorator on the route is the first line of defense.
_PER_EMAIL_WINDOW_MIN: int = 15
_PER_EMAIL_MAX: int = 5
def _sha256(raw: str) -> str:
"""Return the hex SHA-256 of a raw token.
SHA-256 for one-way hashing of high-entropy tokens is acceptable
(see docs/security.md CWE-327). These tokens are already ≥256 bits
from ``secrets.token_urlsafe(32)`` so a KDF is unnecessary.
"""
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
class RateLimitedError(Exception):
"""Raised when a per-email rate limit trips inside the service.
Surfaced to the route layer so it can emit a 429 + render the
``rate_limited.html`` template. Not used for IP-level limits —
those are handled by SlowAPI's own exception.
"""
class AuthService:
"""Request-link / consume orchestration for admin auth."""
def __init__(
self,
engine: Engine,
email: EmailService,
sessions: SessionService,
audit: AuditService,
settings: Settings,
) -> None:
"""Store collaborators by reference."""
self._engine: Engine = engine
self._email: EmailService = email
self._sessions: SessionService = sessions
self._audit: AuditService = audit
self._settings: Settings = settings
# ------------------------------------------------------------------
# request_link
# ------------------------------------------------------------------
def request_link(
self,
*,
email: str,
ip: str,
user_agent: str,
) -> None:
"""Handle POST /admin/login for a validated email.
Behavior
--------
1. Lowercase the email (allowlist comparison is case-insensitive).
2. Check the admin allowlist.
3. If NOT allowlisted: audit ``link_requested`` with
``allowlisted=false`` and return. No token row, no email.
4. If allowlisted: run the DB-side per-email rate-limit check.
On trip: audit ``rate_limited`` and raise
:class:`RateLimitedError`.
5. Otherwise: insert a fresh token row (hash at rest), audit
``link_requested`` with ``allowlisted=true``, and call
:meth:`EmailService.send_magic_link`.
Callers MUST render the same "check your inbox" page regardless
of the allowlist branch — see the admin route for the
anti-enumeration contract.
"""
email = email.strip().lower()
allowlisted = email in self._settings.admin_emails_list
# Always audit the request — this is the trail that catches
# non-allowlisted attempts without leaking that info back to
# the submitter.
if not allowlisted:
self._audit.record(
"link_requested",
email=email,
ip=ip,
user_agent=user_agent,
detail={"allowlisted": False},
)
return
# DB-side per-email rate limit. We run it AFTER the allowlist
# check so non-allowlisted spam doesn't cause extra queries.
cutoff = datetime.now(timezone.utc) - timedelta(
minutes=_PER_EMAIL_WINDOW_MIN
)
with self._engine.connect() as conn:
row = conn.execute(
text(
"SELECT COUNT(*) AS c FROM magic_link_tokens"
" WHERE email = :email AND created_at > :cutoff"
),
{"email": email, "cutoff": cutoff.isoformat()},
).mappings().first()
recent_count = int(row["c"]) if row is not None else 0
if recent_count >= _PER_EMAIL_MAX:
self._audit.record(
"rate_limited",
email=email,
ip=ip,
user_agent=user_agent,
detail={"scope": "email", "endpoint": "/admin/login"},
)
raise RateLimitedError("per-email token limit reached")
# Mint the token — raw lives only in memory + email URL.
raw = secrets.token_urlsafe(32)
token_hash = _sha256(raw)
now = datetime.now(timezone.utc)
expires_at = now + timedelta(
minutes=self._settings.magic_link_ttl_min
)
with self._engine.begin() as conn:
conn.execute(
text(
"INSERT INTO magic_link_tokens"
" (email, token_hash, created_at, expires_at,"
" request_ip)"
" VALUES (:email, :token_hash, :created_at,"
" :expires_at, :request_ip)"
),
{
"email": email,
"token_hash": token_hash,
"created_at": now.isoformat(),
"expires_at": expires_at.isoformat(),
"request_ip": ip or "",
},
)
self._audit.record(
"link_requested",
email=email,
ip=ip,
user_agent=user_agent,
detail={"allowlisted": True},
)
# Build the magic-link URL. Using a path param (not a query
# string) keeps the raw token out of many access-log formats.
base = self._settings.public_base_url.rstrip("/")
url = f"{base}/admin/auth/consume/{raw}"
display_name = email.split("@", 1)[0].title() or email
# EmailService never raises in dev; in prod it may log an
# exception but still return. Either way the request-path
# response is identical.
self._email.send_magic_link(
to=email,
url=url,
display_name=display_name,
ttl_min=self._settings.magic_link_ttl_min,
expires_at=expires_at,
)
# ------------------------------------------------------------------
# consume
# ------------------------------------------------------------------
def consume(
self,
*,
raw_token: str,
ip: str,
user_agent: str,
) -> Optional[tuple[User, Session, str]]:
"""Consume a magic-link token and issue a session.
Returns ``(user, session, cookie_value)`` on success, ``None``
on any invalid / expired / already-used / unknown token (the
caller should render the generic failure page with HTTP 400).
Concurrency safety: the UPDATE statement's WHERE clause is the
atomic guard — if two requests race with the same token, only
one will get ``rowcount == 1``. The loser is treated as a
replay.
"""
if not raw_token:
# Audit runs in its own transaction (see notes below) so we
# can record the event without opening a write lock we'd
# then try to re-enter from this service.
self._audit.record(
"consume_failed",
ip=ip,
user_agent=user_agent,
detail={"reason": "not_found"},
)
return None
token_hash = _sha256(raw_token)
now = datetime.now(timezone.utc)
now_iso = now.isoformat()
# ``engine.begin()`` holds a write lock on SQLite for its whole
# scope. AuditService opens its OWN transaction and would be
# blocked behind this one — so we do not call ``self._audit``
# until AFTER this block exits. Track the outcome locally.
reloaded = None
failure_reason: Optional[str] = None
email: Optional[str] = None
user_id: Optional[int] = None
# Atomic single-use: only mark the row used if it's still
# valid. rowcount tells us whether we were the one to claim it.
with self._engine.begin() as conn:
update_result = conn.execute(
text(
"UPDATE magic_link_tokens"
" SET used_at = :now"
" WHERE token_hash = :h"
" AND used_at IS NULL"
" AND expires_at > :now"
),
{"now": now_iso, "h": token_hash},
)
if update_result.rowcount != 1:
# Distinguish expired / used / not_found for the audit
# trail (not for the client response). Run an extra
# SELECT only on the failure path so the happy path
# stays a single write.
row = conn.execute(
text(
"SELECT expires_at, used_at FROM magic_link_tokens"
" WHERE token_hash = :h LIMIT 1"
),
{"h": token_hash},
).mappings().first()
if row is None:
failure_reason = "not_found"
elif row["used_at"] is not None:
failure_reason = "used"
else:
failure_reason = "expired"
else:
# Token claimed. Look up the email so we can upsert the user.
token_row = conn.execute(
text(
"SELECT email FROM magic_link_tokens"
" WHERE token_hash = :h LIMIT 1"
),
{"h": token_hash},
).mappings().first()
# Should always exist — the UPDATE just wrote to it.
if token_row is None: # pragma: no cover — impossible path
failure_reason = "not_found"
else:
email = token_row["email"]
# Upsert user. Existing row → bump last_login_at and
# re-activate; missing row → insert with titlecased
# local part as display_name and active=1.
user_row = conn.execute(
text("SELECT id FROM users WHERE email = :e"),
{"e": email},
).mappings().first()
if user_row is None:
display_name = email.split("@", 1)[0].title() or email
insert_result = conn.execute(
text(
"INSERT INTO users"
" (email, display_name, created_at,"
" last_login_at, active)"
" VALUES (:email, :display_name, :created_at,"
" :last_login_at, 1)"
),
{
"email": email,
"display_name": display_name,
"created_at": now_iso,
"last_login_at": now_iso,
},
)
user_id = int(insert_result.lastrowid) # type: ignore[arg-type]
else:
user_id = int(user_row["id"])
conn.execute(
text(
"UPDATE users"
" SET last_login_at = :now, active = 1"
" WHERE id = :id"
),
{"now": now_iso, "id": user_id},
)
# Reload the user row so we return a fully-populated
# entity.
reloaded = conn.execute(
text(
"SELECT id, email, display_name, created_at,"
" last_login_at, active"
" FROM users WHERE id = :id"
),
{"id": user_id},
).mappings().first()
# --- Post-transaction: audit + session create --------------------
# Running these AFTER the transaction closes avoids the SQLite
# single-writer deadlock that would happen if AuditService tried
# to open a new write connection from inside the block above.
if failure_reason is not None:
self._audit.record(
"consume_failed",
ip=ip,
user_agent=user_agent,
detail={"reason": failure_reason},
)
return None
if reloaded is None: # pragma: no cover — impossible path
return None
user = row_to_user(reloaded)
# Session creation runs in its own transaction now that the
# consume write lock has been released.
session, cookie_value = self._sessions.create(
user_id=user.id, ip=ip, user_agent=user_agent
)
self._audit.record(
"link_consumed",
email=email,
user_id=user.id,
ip=ip,
user_agent=user_agent,
detail={"user_id": user.id},
)
self._audit.record(
"session_created",
email=email,
user_id=user.id,
ip=ip,
user_agent=user_agent,
# last 6 hex chars of the stored hash — enough to correlate,
# not enough to reverse.
detail={
"session_id": session.token_hash[-6:],
"user_id": user.id,
},
)
return user, session, cookie_value

141
app/services/email.py Normal file
View File

@@ -0,0 +1,141 @@
"""Transactional email sender with a dev-mode log fallback.
Thin wrapper around the Resend API (``resend.Emails.send``). Renders
both HTML and plaintext magic-link bodies from Jinja templates to keep
copy out of Python code.
Security and UX rules
---------------------
- **Never raise on missing credentials in development.** A 500 from the
login POST would expose whether an email is on the allowlist
(successful sends would succeed, non-allowlisted "sends" would still
short-circuit) and it would also break local dev. In development we
log a ``magic_link_dev_fallback`` structured event with the full
magic-link URL so the developer can copy it.
- **Production must fail at startup** if Resend credentials are absent;
that validator lives in :class:`app.config.Settings`, not here.
- Never log the raw token on its own — only as part of the URL in the
dev fallback (which is the whole point of the fallback).
"""
from __future__ import annotations
from datetime import datetime
from typing import Optional
import structlog
from fastapi.templating import Jinja2Templates
from app.config import Settings
_log = structlog.get_logger(__name__)
class EmailService:
"""Send magic-link emails via Resend, with a dev-mode log fallback."""
def __init__(self, settings: Settings, templates: Jinja2Templates) -> None:
"""Store dependencies by reference.
Parameters
----------
settings:
Application settings; used to pick up ``resend_api_key`` /
``resend_from`` / ``app_env`` at send time so rotating them
at runtime (dev) works.
templates:
Shared Jinja2 environment. We reuse the app-level one so
template autoescape defaults and the search path match the
rest of the site.
"""
self._settings: Settings = settings
self._templates: Jinja2Templates = templates
def send_magic_link(
self,
*,
to: str,
url: str,
display_name: str,
ttl_min: int,
expires_at: datetime,
) -> None:
"""Send a magic-link email to ``to`` or log the URL in dev.
Behavior
--------
- If ``settings.resend_api_key`` is truthy, render both bodies
and send via Resend.
- Otherwise (development only — the production config validator
refuses to boot without a key), emit a structured log event
``magic_link_dev_fallback`` that includes the full URL.
Never raises in the dev fallback path; errors from the Resend
API surface as logged exceptions so the request-handler layer
always returns the same response shape regardless of whether
the email was actually sent (CWE-200 / anti-enumeration).
"""
# Build both template bodies first so any rendering error
# surfaces before we talk to the network.
ctx = {
"display_name": display_name,
"magic_link_url": url,
"expires_at": expires_at.isoformat(),
"ttl_min": ttl_min,
}
html_body = self._render("emails/magic_link.html", ctx)
text_body = self._render("emails/magic_link.txt", ctx)
api_key: Optional[str] = self._settings.resend_api_key
sender: Optional[str] = self._settings.resend_from
# Dev fallback path: no key configured. Log the URL at INFO so
# the developer can complete the flow, and return.
if not api_key or not sender:
_log.info(
"magic_link_dev_fallback",
to=to,
# Raw token is embedded in the URL; acceptable because
# this path ONLY runs in local dev (production validator
# refuses to boot without RESEND_API_KEY).
magic_link_url=url,
ttl_min=ttl_min,
)
return
# Real send. We import here to avoid taking a hard import-time
# dependency on `resend`'s module-level state during tests that
# never exercise the send path.
try:
import resend # type: ignore[import-untyped]
resend.api_key = api_key
resend.Emails.send(
{
"from": sender,
"to": to,
"subject": "Your Chicken Babies R Us admin login link",
"html": html_body,
"text": text_body,
}
)
_log.info("magic_link_email_sent", to=to)
except Exception: # noqa: BLE001
# Do NOT re-raise from the request path — see anti-enumeration
# note at module top. Log with redacted context.
_log.exception("magic_link_email_failed", to=to)
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _render(self, template_name: str, context: dict) -> str:
"""Render a Jinja template to a string.
We use the underlying Jinja environment directly so we get a
plain string back (``Jinja2Templates.TemplateResponse`` wraps
the output in an HTTP response, which is not what we want for
outbound email bodies).
"""
template = self._templates.env.get_template(template_name)
return template.render(**context)

View File

@@ -0,0 +1,44 @@
"""SlowAPI rate-limiter wiring for auth endpoints.
Only one limiter is used process-wide; it's built in
:func:`create_limiter` and stored on ``app.state.limiter`` so the
``@limiter.limit`` decorator can pick it up from the request.
Storage is ``memory://``. At this scale (single-digit requests/second,
single container) a persistent backend is not worth the operational
cost, and the consequences of losing limiter state on restart are
acceptable — the DB-side per-email check (in
:mod:`app.services.auth`) catches sustained abuse across restarts.
"""
from __future__ import annotations
from slowapi import Limiter
from slowapi.util import get_remote_address
# Process-wide singleton. Module-level because SlowAPI's ``@limiter.limit``
# decorator has to be applied at endpoint-definition time (before the
# router is wired into the FastAPI app), and that has to reference the
# same limiter instance that the request path consults via
# ``request.app.state.limiter``.
#
# Storage is ``memory://``: in-process, single-container scale. Restarts
# drop in-flight counters — acceptable because the DB-side per-email
# check in :mod:`app.services.auth` backs this up for longer-lived abuse
# patterns.
limiter: Limiter = Limiter(
key_func=get_remote_address,
storage_uri="memory://",
)
def create_limiter() -> Limiter:
"""Return the process-wide :class:`slowapi.Limiter` singleton.
Kept as a function (rather than exposing the module-level
``limiter`` directly) to match the service-factory pattern used by
:class:`AuditService` / :class:`EmailService` / ... and to give
tests a hook to monkeypatch if they ever need a per-test limiter.
"""
return limiter

240
app/services/sessions.py Normal file
View File

@@ -0,0 +1,240 @@
"""Server-side session issuance, lookup, and revocation.
Sessions combine two storage layers:
1. **Database row** in ``sessions``. Stores ``sha256(raw)`` as
``token_hash`` (never the raw token), ``expires_at`` set to
``SESSION_MAX_DAYS`` from creation, and ``revoked_at`` for
soft-delete on logout.
2. **Signed cookie** (``cb_session``). Carries the raw session id
wrapped by :class:`itsdangerous.URLSafeTimedSerializer` with the
configured ``SECRET_KEY`` and salt ``session``. The signature
prevents tampering; the DB lookup prevents replay after logout.
Security and behavior rules
---------------------------
- Raw session IDs live only in memory and the outbound cookie. The DB
stores only the SHA-256 hash.
- ``Secure`` cookie flag is OFF in development (so plain-HTTP
``127.0.0.1`` works) and ON in production. All other flags
(``HttpOnly``, ``SameSite=Lax``, ``Path=/``) are always set.
- Revocation flips ``revoked_at`` but NEVER deletes the row; the audit
trail must survive logout.
"""
from __future__ import annotations
import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
import structlog
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from sqlalchemy import Engine, text
from app.config import Settings
from app.models.entities import Session
from app.models.mappers import row_to_session
_log = structlog.get_logger(__name__)
# Cookie name used on every request. Shared constant so routes and
# dependencies don't drift.
COOKIE_NAME: str = "cb_session"
def _sha256(raw: str) -> str:
"""Return the hex SHA-256 of a raw token.
SHA-256 is explicitly permitted for non-password one-way hashing
(see docs/security.md CWE-327). Tokens here are already
high-entropy (256-bit ``secrets.token_urlsafe(32)``) so a single
pass is sufficient — we don't need a slow-hash KDF.
"""
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
class SessionService:
"""Create, look up, and revoke admin sessions."""
def __init__(
self,
engine: Engine,
signer: URLSafeTimedSerializer,
settings: Settings,
) -> None:
"""Store the engine, signer, and settings by reference.
Parameters
----------
engine:
Shared SQLAlchemy engine.
signer:
Pre-built ``itsdangerous.URLSafeTimedSerializer`` bound to
``settings.secret_key`` with salt ``"session"``. Injected
rather than constructed here so the same instance is used
across all consumers (and tests can monkeypatch).
settings:
Application settings; we read ``session_max_days`` and
``app_env`` (to decide the ``Secure`` cookie flag).
"""
self._engine: Engine = engine
self._signer: URLSafeTimedSerializer = signer
self._settings: Settings = settings
# ------------------------------------------------------------------
# Creation
# ------------------------------------------------------------------
def create(
self,
*,
user_id: int,
ip: str,
user_agent: str,
) -> tuple[Session, str]:
"""Mint a new session row and return the signed cookie value.
Returns a ``(session, cookie_value)`` tuple. The caller is
responsible for attaching the cookie to the HTTP response with
the appropriate flags — use :meth:`cookie_params` for those.
"""
raw = secrets.token_urlsafe(32) # ≥256 bits of entropy
token_hash = _sha256(raw)
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=self._settings.session_max_days)
with self._engine.begin() as conn:
result = conn.execute(
text(
"INSERT INTO sessions"
" (user_id, token_hash, created_at, expires_at,"
" ip, user_agent)"
" VALUES (:user_id, :token_hash, :created_at,"
" :expires_at, :ip, :user_agent)"
),
{
"user_id": user_id,
"token_hash": token_hash,
"created_at": now.isoformat(),
"expires_at": expires_at.isoformat(),
"ip": ip or "",
"user_agent": user_agent or "",
},
)
new_id = int(result.lastrowid) # type: ignore[arg-type]
row = conn.execute(
text(
"SELECT id, user_id, token_hash, created_at, expires_at,"
" ip, user_agent, revoked_at"
" FROM sessions WHERE id = :id"
),
{"id": new_id},
).mappings().first()
if row is None: # pragma: no cover — insert just succeeded
raise RuntimeError("failed to reload just-inserted session row")
session = row_to_session(row)
cookie_value = self._signer.dumps(raw)
return session, cookie_value
# ------------------------------------------------------------------
# Lookup
# ------------------------------------------------------------------
def lookup(self, cookie_value: Optional[str]) -> Optional[Session]:
"""Resolve a cookie value to an active :class:`Session`.
Returns ``None`` on:
- missing cookie
- invalid signature
- expired signature (we reject if older than
``session_max_days``)
- no matching row
- session revoked or past its ``expires_at``
All failure modes intentionally return the same ``None`` so
callers cannot distinguish them (CWE-200).
"""
if not cookie_value:
return None
max_age_s = self._settings.session_max_days * 86400
try:
raw: str = self._signer.loads(cookie_value, max_age=max_age_s)
except SignatureExpired:
_log.info("session_cookie_expired")
return None
except BadSignature:
_log.info("session_cookie_bad_signature")
return None
token_hash = _sha256(raw)
now_iso = datetime.now(timezone.utc).isoformat()
with self._engine.connect() as conn:
row = conn.execute(
text(
"SELECT id, user_id, token_hash, created_at, expires_at,"
" ip, user_agent, revoked_at"
" FROM sessions"
" WHERE token_hash = :h"
" AND revoked_at IS NULL"
" AND expires_at > :now"
" LIMIT 1"
),
{"h": token_hash, "now": now_iso},
).mappings().first()
if row is None:
return None
return row_to_session(row)
# ------------------------------------------------------------------
# Revocation
# ------------------------------------------------------------------
def revoke(self, session: Session) -> None:
"""Mark a session as revoked (soft-delete).
The row stays in the DB so the audit trail remains intact; the
``revoked_at`` timestamp is what the ``lookup`` query filters
on.
"""
now_iso = datetime.now(timezone.utc).isoformat()
with self._engine.begin() as conn:
conn.execute(
text(
"UPDATE sessions SET revoked_at = :now"
" WHERE id = :id AND revoked_at IS NULL"
),
{"now": now_iso, "id": session.id},
)
# ------------------------------------------------------------------
# Cookie helpers
# ------------------------------------------------------------------
def cookie_params(self) -> dict:
"""Return kwargs for ``response.set_cookie`` matching our policy.
Values:
- ``key`` = :data:`COOKIE_NAME`
- ``httponly=True``
- ``samesite="lax"``
- ``secure`` = True only in production (plain-HTTP 127.0.0.1
needs ``Secure=False`` in dev)
- ``max_age`` = ``session_max_days * 86400``
- ``path="/"``
The caller supplies ``value`` separately.
"""
return {
"key": COOKIE_NAME,
"httponly": True,
"samesite": "lax",
"secure": self._settings.app_env == "production",
"max_age": self._settings.session_max_days * 86400,
"path": "/",
}