"""Server-side session issuance, lookup, and revocation. Sessions combine two storage layers: 1. **Database row** in ``sessions``. Stores ``sha256(raw)`` as ``token_hash`` (never the raw token), ``expires_at`` set to ``SESSION_MAX_DAYS`` from creation, and ``revoked_at`` for soft-delete on logout. 2. **Signed cookie** (``cb_session``). Carries the raw session id wrapped by :class:`itsdangerous.URLSafeTimedSerializer` with the configured ``SECRET_KEY`` and salt ``session``. The signature prevents tampering; the DB lookup prevents replay after logout. Security and behavior rules --------------------------- - Raw session IDs live only in memory and the outbound cookie. The DB stores only the SHA-256 hash. - ``Secure`` cookie flag is OFF in development (so plain-HTTP ``127.0.0.1`` works) and ON in production. All other flags (``HttpOnly``, ``SameSite=Lax``, ``Path=/``) are always set. - Revocation flips ``revoked_at`` but NEVER deletes the row; the audit trail must survive logout. """ from __future__ import annotations import hashlib import secrets from datetime import datetime, timedelta, timezone from typing import Optional import structlog from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer from sqlalchemy import Engine, text from app.config import Settings from app.models.entities import Session from app.models.mappers import row_to_session _log = structlog.get_logger(__name__) # Cookie name used on every request. Shared constant so routes and # dependencies don't drift. COOKIE_NAME: str = "cb_session" def _sha256(raw: str) -> str: """Return the hex SHA-256 of a raw token. SHA-256 is explicitly permitted for non-password one-way hashing (see docs/security.md CWE-327). Tokens here are already high-entropy (256-bit ``secrets.token_urlsafe(32)``) so a single pass is sufficient — we don't need a slow-hash KDF. """ return hashlib.sha256(raw.encode("utf-8")).hexdigest() class SessionService: """Create, look up, and revoke admin sessions.""" def __init__( self, engine: Engine, signer: URLSafeTimedSerializer, settings: Settings, ) -> None: """Store the engine, signer, and settings by reference. Parameters ---------- engine: Shared SQLAlchemy engine. signer: Pre-built ``itsdangerous.URLSafeTimedSerializer`` bound to ``settings.secret_key`` with salt ``"session"``. Injected rather than constructed here so the same instance is used across all consumers (and tests can monkeypatch). settings: Application settings; we read ``session_max_days`` and ``app_env`` (to decide the ``Secure`` cookie flag). """ self._engine: Engine = engine self._signer: URLSafeTimedSerializer = signer self._settings: Settings = settings # ------------------------------------------------------------------ # Creation # ------------------------------------------------------------------ def create( self, *, user_id: int, ip: str, user_agent: str, ) -> tuple[Session, str]: """Mint a new session row and return the signed cookie value. Returns a ``(session, cookie_value)`` tuple. The caller is responsible for attaching the cookie to the HTTP response with the appropriate flags — use :meth:`cookie_params` for those. """ raw = secrets.token_urlsafe(32) # ≥256 bits of entropy token_hash = _sha256(raw) now = datetime.now(timezone.utc) expires_at = now + timedelta(days=self._settings.session_max_days) with self._engine.begin() as conn: result = conn.execute( text( "INSERT INTO sessions" " (user_id, token_hash, created_at, expires_at," " ip, user_agent)" " VALUES (:user_id, :token_hash, :created_at," " :expires_at, :ip, :user_agent)" ), { "user_id": user_id, "token_hash": token_hash, "created_at": now.isoformat(), "expires_at": expires_at.isoformat(), "ip": ip or "", "user_agent": user_agent or "", }, ) new_id = int(result.lastrowid) # type: ignore[arg-type] row = conn.execute( text( "SELECT id, user_id, token_hash, created_at, expires_at," " ip, user_agent, revoked_at" " FROM sessions WHERE id = :id" ), {"id": new_id}, ).mappings().first() if row is None: # pragma: no cover — insert just succeeded raise RuntimeError("failed to reload just-inserted session row") session = row_to_session(row) cookie_value = self._signer.dumps(raw) return session, cookie_value # ------------------------------------------------------------------ # Lookup # ------------------------------------------------------------------ def lookup(self, cookie_value: Optional[str]) -> Optional[Session]: """Resolve a cookie value to an active :class:`Session`. Returns ``None`` on: - missing cookie - invalid signature - expired signature (we reject if older than ``session_max_days``) - no matching row - session revoked or past its ``expires_at`` All failure modes intentionally return the same ``None`` so callers cannot distinguish them (CWE-200). """ if not cookie_value: return None max_age_s = self._settings.session_max_days * 86400 try: raw: str = self._signer.loads(cookie_value, max_age=max_age_s) except SignatureExpired: _log.info("session_cookie_expired") return None except BadSignature: _log.info("session_cookie_bad_signature") return None token_hash = _sha256(raw) now_iso = datetime.now(timezone.utc).isoformat() with self._engine.connect() as conn: row = conn.execute( text( "SELECT id, user_id, token_hash, created_at, expires_at," " ip, user_agent, revoked_at" " FROM sessions" " WHERE token_hash = :h" " AND revoked_at IS NULL" " AND expires_at > :now" " LIMIT 1" ), {"h": token_hash, "now": now_iso}, ).mappings().first() if row is None: return None return row_to_session(row) # ------------------------------------------------------------------ # Revocation # ------------------------------------------------------------------ def revoke(self, session: Session) -> None: """Mark a session as revoked (soft-delete). The row stays in the DB so the audit trail remains intact; the ``revoked_at`` timestamp is what the ``lookup`` query filters on. """ now_iso = datetime.now(timezone.utc).isoformat() with self._engine.begin() as conn: conn.execute( text( "UPDATE sessions SET revoked_at = :now" " WHERE id = :id AND revoked_at IS NULL" ), {"now": now_iso, "id": session.id}, ) # ------------------------------------------------------------------ # Cookie helpers # ------------------------------------------------------------------ def cookie_params(self) -> dict: """Return kwargs for ``response.set_cookie`` matching our policy. Values: - ``key`` = :data:`COOKIE_NAME` - ``httponly=True`` - ``samesite="lax"`` - ``secure`` = True only in production (plain-HTTP 127.0.0.1 needs ``Secure=False`` in dev) - ``max_age`` = ``session_max_days * 86400`` - ``path="/"`` The caller supplies ``value`` separately. """ return { "key": COOKIE_NAME, "httponly": True, "samesite": "lax", "secure": self._settings.app_env == "production", "max_age": self._settings.session_max_days * 86400, "path": "/", }