chore: phase 6 hardening — CSP/HSTS, access log, docker, backup, CI
Ships the cross-cutting hardening set: - SecurityHeadersMiddleware: per-request nonce-based CSP, HSTS (production only), Referrer-Policy, Permissions-Policy, X-Content-Type-Options, frame-ancestors 'none', form-action 'self'. - AccessLogMiddleware: one http_request INFO event per request (method/path/status/duration_ms/ip/ua). Skips /healthz, redacts /admin/auth/consume/<token> paths, logs 500 + re-raises on downstream exceptions. - Public base.html inline nav-toggle script gets a nonce so it passes strict CSP without relaxing to 'unsafe-inline'. - Dockerfile: non-root app user (uid/gid 10001) + stdlib-only HEALTHCHECK against /healthz. - scripts/backup.sh: sqlite3 .backup + tar data/media with 14-entry retention; host-side cron install documented. - .gitea/workflows/build-image.yml: on push to master / workflow_dispatch, builds and publishes git.sneakygeek.net/ptarrant/chicken_babies_site:latest + sha-<short>, with GIT_COMMIT_SHA threaded as a build-arg so /healthz keeps reporting the right commit in deployed images. - 8 new tests (security headers + access log). Pre-existing dev failures (logo asset rename + RESEND env pollution) remain unchanged; verified not Phase 6 regressions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
26
app/middleware/__init__.py
Normal file
26
app/middleware/__init__.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""Application-level Starlette middlewares.
|
||||
|
||||
Each middleware is a :class:`starlette.middleware.base.BaseHTTPMiddleware`
|
||||
subclass wired up in :mod:`app.main`. They are kept in their own package
|
||||
(rather than buried in ``app/main.py``) because Phase 6 introduced two
|
||||
cross-cutting middlewares — security headers and access logging — that
|
||||
benefit from isolated unit tests and clear homes for future additions
|
||||
(rate-limit re-shaping, request-id propagation, etc.).
|
||||
|
||||
Public surface:
|
||||
|
||||
- :class:`SecurityHeadersMiddleware` — per-request CSP nonce + strict
|
||||
response headers. See :mod:`app.middleware.security_headers`.
|
||||
- :class:`AccessLogMiddleware` — structured ``http_request`` log line
|
||||
after every response. See :mod:`app.middleware.access_log`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from app.middleware.access_log import AccessLogMiddleware
|
||||
from app.middleware.security_headers import SecurityHeadersMiddleware
|
||||
|
||||
__all__ = [
|
||||
"AccessLogMiddleware",
|
||||
"SecurityHeadersMiddleware",
|
||||
]
|
||||
113
app/middleware/access_log.py
Normal file
113
app/middleware/access_log.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Structured access-log middleware.
|
||||
|
||||
Emits a single ``http_request`` INFO event per request, capturing the
|
||||
HTTP verb, path, status code, wall-clock duration, client IP, and a
|
||||
truncated user-agent. The goal is a compact but auditable trail of
|
||||
production traffic without the noise of a traditional access log and
|
||||
without ever writing secrets to disk.
|
||||
|
||||
Key design choices:
|
||||
|
||||
- **No bodies, no query strings.** We deliberately skip query strings
|
||||
entirely to avoid leaking tokens that might end up there in future
|
||||
(the magic-link consume route keeps its token in the path, so we
|
||||
redact that explicitly below).
|
||||
- **Magic-link path redaction.** Requests to
|
||||
``/admin/auth/consume/{token}`` have the token segment replaced with
|
||||
``<redacted>`` before logging. ``CLAUDE.md`` forbids logging raw
|
||||
tokens anywhere.
|
||||
- **Skip ``/healthz``.** Compose / Docker health probes hit this every
|
||||
few seconds. Logging each one drowns out real traffic.
|
||||
- **Exceptions still get logged.** If the downstream handler raises,
|
||||
we record a ``status_code=500`` entry before re-raising so no failed
|
||||
request vanishes silently.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import structlog
|
||||
from fastapi import Request
|
||||
from fastapi.responses import Response
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
# Prefix used to recognise magic-link consume URLs whose trailing token
|
||||
# segment must be redacted before logging.
|
||||
_CONSUME_PREFIX: str = "/admin/auth/consume/"
|
||||
|
||||
# Path we skip entirely to reduce health-probe log noise.
|
||||
_SKIP_PATH: str = "/healthz"
|
||||
|
||||
# User-agent strings are unbounded; cap to 256 chars so a hostile client
|
||||
# can't bloat log lines to arbitrary size.
|
||||
_UA_MAX: int = 256
|
||||
|
||||
|
||||
def _redact_path(path: str) -> str:
|
||||
"""Return ``path`` with magic-link tokens replaced by ``<redacted>``.
|
||||
|
||||
The consume URL is ``/admin/auth/consume/{token}``; everything after
|
||||
the prefix is swapped out. We preserve the prefix so log readers can
|
||||
still see which route was hit.
|
||||
"""
|
||||
if path.startswith(_CONSUME_PREFIX):
|
||||
return _CONSUME_PREFIX + "<redacted>"
|
||||
return path
|
||||
|
||||
|
||||
class AccessLogMiddleware(BaseHTTPMiddleware):
|
||||
"""Log one ``http_request`` event per completed request.
|
||||
|
||||
Installed outermost in :mod:`app.main` so the timing measurement
|
||||
covers the entire downstream middleware stack, including security
|
||||
headers and CSRF cookie work.
|
||||
"""
|
||||
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
"""Time the request, log a structured event, reraise on failure."""
|
||||
path: str = request.url.path
|
||||
|
||||
# Early exit for health probes — don't even record timing.
|
||||
if path == _SKIP_PATH:
|
||||
return await call_next(request)
|
||||
|
||||
method: str = request.method
|
||||
client_ip: str = request.client.host if request.client else ""
|
||||
user_agent: str = request.headers.get("user-agent", "")[:_UA_MAX]
|
||||
redacted_path: str = _redact_path(path)
|
||||
|
||||
start: float = time.monotonic()
|
||||
try:
|
||||
response: Response = await call_next(request)
|
||||
except Exception:
|
||||
duration_ms = int((time.monotonic() - start) * 1000)
|
||||
# Record the failure before re-raising so unhandled exceptions
|
||||
# don't vanish from the log. Status is synthetic (500) because
|
||||
# the framework hasn't written a response yet at this point.
|
||||
logger.info(
|
||||
"http_request",
|
||||
method=method,
|
||||
path=redacted_path,
|
||||
status_code=500,
|
||||
duration_ms=duration_ms,
|
||||
client_ip=client_ip,
|
||||
user_agent=user_agent,
|
||||
)
|
||||
raise
|
||||
|
||||
duration_ms = int((time.monotonic() - start) * 1000)
|
||||
logger.info(
|
||||
"http_request",
|
||||
method=method,
|
||||
path=redacted_path,
|
||||
status_code=response.status_code,
|
||||
duration_ms=duration_ms,
|
||||
client_ip=client_ip,
|
||||
user_agent=user_agent,
|
||||
)
|
||||
return response
|
||||
122
app/middleware/security_headers.py
Normal file
122
app/middleware/security_headers.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Security-headers middleware.
|
||||
|
||||
Installs a strict-ish set of security response headers on every outgoing
|
||||
response — notably a nonce-based ``Content-Security-Policy`` that locks
|
||||
inline ``<script>`` usage to per-request tokens. Templates read the
|
||||
nonce from ``request.state.csp_nonce`` and stamp it onto whichever
|
||||
``<script>`` blocks need to run.
|
||||
|
||||
Modelled on :class:`app.main.CSRFCookieMiddleware`: the constructor
|
||||
takes the ASGI app plus any configuration it needs by keyword;
|
||||
``dispatch`` is async and always returns the downstream response.
|
||||
|
||||
Header set (matches ``docs/security.md`` + Phase 6 brief):
|
||||
|
||||
- ``Content-Security-Policy`` — nonce-based ``script-src`` that also
|
||||
allowlists hCaptcha; ``frame-ancestors 'none'`` replaces the legacy
|
||||
``X-Frame-Options: DENY``.
|
||||
- ``Strict-Transport-Security`` — **only in production**; the dev
|
||||
server is reached over plain HTTP at ``http://127.0.0.1:8000`` and
|
||||
HSTS would make that session permanently HTTPS-only for the browser.
|
||||
- ``X-Content-Type-Options: nosniff``
|
||||
- ``Referrer-Policy: strict-origin-when-cross-origin``
|
||||
- ``Permissions-Policy`` — disable every sensor/device API we do not
|
||||
use (defense in depth for any future supply-chain compromise).
|
||||
- ``Cross-Origin-Opener-Policy: same-origin``
|
||||
|
||||
The nonce is generated fresh per request (``secrets.token_urlsafe(16)``
|
||||
→ 128 bits, URL-safe) and stored on ``request.state.csp_nonce`` before
|
||||
the downstream handler runs, so Jinja templates can read it via the
|
||||
implicit ``request`` context variable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
|
||||
from fastapi import Request
|
||||
from fastapi.responses import Response
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
|
||||
# --- CSP template ---------------------------------------------------------
|
||||
# Note the ``{nonce}`` placeholder: we format per-request so the token is
|
||||
# unique to each response. ``style-src 'unsafe-inline'`` is a known
|
||||
# compromise — we don't emit our own inline styles, but third-party
|
||||
# widgets (hCaptcha) and some HTML attribute defaults want it. Locking
|
||||
# this down is a future task.
|
||||
_CSP_TEMPLATE: str = (
|
||||
"default-src 'self'; "
|
||||
"script-src 'self' 'nonce-{nonce}' https://js.hcaptcha.com https://*.hcaptcha.com; "
|
||||
"style-src 'self' 'unsafe-inline'; "
|
||||
"img-src 'self' data:; "
|
||||
"font-src 'self'; "
|
||||
"connect-src 'self' https://*.hcaptcha.com; "
|
||||
"frame-src https://*.hcaptcha.com https://newassets.hcaptcha.com; "
|
||||
"frame-ancestors 'none'; "
|
||||
"base-uri 'self'; "
|
||||
"form-action 'self'"
|
||||
)
|
||||
|
||||
|
||||
# Static response headers that do not depend on per-request state. Kept
|
||||
# as a module-level dict so we pay the allocation cost once at import
|
||||
# time and just iterate on every response.
|
||||
_STATIC_HEADERS: dict[str, str] = {
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Referrer-Policy": "strict-origin-when-cross-origin",
|
||||
"Permissions-Policy": (
|
||||
"accelerometer=(), camera=(), geolocation=(), gyroscope=(), "
|
||||
"magnetometer=(), microphone=(), payment=(), usb=()"
|
||||
),
|
||||
"Cross-Origin-Opener-Policy": "same-origin",
|
||||
}
|
||||
|
||||
|
||||
# HSTS is production-only. One year, subdomains included; no ``preload``
|
||||
# directive (that requires submitting the apex to the HSTS preload list,
|
||||
# which is a separate operational step).
|
||||
_HSTS_VALUE: str = "max-age=31536000; includeSubDomains"
|
||||
|
||||
|
||||
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
|
||||
"""Attach CSP + friends to every response.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
app:
|
||||
The ASGI application. ``BaseHTTPMiddleware`` stores this.
|
||||
production:
|
||||
When ``True`` the middleware also emits ``Strict-Transport-Security``.
|
||||
Passed explicitly (rather than read from :mod:`app.config` here)
|
||||
so the middleware remains unit-testable without loading settings.
|
||||
"""
|
||||
|
||||
def __init__(self, app, *, production: bool) -> None:
|
||||
"""Store the production flag; the app handle is owned by the base class."""
|
||||
super().__init__(app)
|
||||
self._production: bool = production
|
||||
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
"""Mint a nonce, run the downstream handler, stamp the headers.
|
||||
|
||||
The nonce is attached to ``request.state`` *before* ``call_next``
|
||||
so any template rendered by the route handler can read it. The
|
||||
CSP header itself is added after the response is produced so it
|
||||
rides on every path (HTML, JSON, static bypass, error pages).
|
||||
"""
|
||||
# 128 bits of entropy, URL-safe base64 — plenty for CSP nonce use.
|
||||
nonce: str = secrets.token_urlsafe(16)
|
||||
request.state.csp_nonce = nonce
|
||||
|
||||
response: Response = await call_next(request)
|
||||
|
||||
response.headers["Content-Security-Policy"] = _CSP_TEMPLATE.format(
|
||||
nonce=nonce
|
||||
)
|
||||
for key, value in _STATIC_HEADERS.items():
|
||||
response.headers[key] = value
|
||||
if self._production:
|
||||
response.headers["Strict-Transport-Security"] = _HSTS_VALUE
|
||||
|
||||
return response
|
||||
Reference in New Issue
Block a user