feat: phase 4 admin CMS — dashboard, editor, media, CSRF

Head Hen CMS end-to-end: dashboard lists all posts (drafts + published),
Markdown editor with live preview + drag-drop image upload, Pillow media
pipeline re-encoding every upload to JPEG, post CRUD + publish toggle +
hard delete, About page edit, and double-submit CSRF cookie enforced on
every admin mutating endpoint (Phase 3's TODO markers resolved).

Slug auto-generated on create and server-locked once a post has been
published. Unpublish preserves `published_at` so re-publish keeps
original date ordering. Every admin write invalidates the read-side
Post/Page TTL caches and records an `auth_events` audit row.

CSRF middleware is narrow by design — issues/refreshes the `cb_csrf`
cookie only on `GET /admin*`, and mutating endpoints opt in via
`require_csrf_form` or `require_csrf_header` Depends. Public routes,
healthz, and pre-auth login stay untouched.

64 new tests cover slugs, CSRF, media, admin posts/pages services, and
end-to-end CMS routes. Tests never mock the DB — real temp SQLite files
per the CLAUDE.md mandate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-21 20:42:01 -05:00
parent 76875a455e
commit 9a8506970c
30 changed files with 3831 additions and 74 deletions

124
app/services/admin_pages.py Normal file
View File

@@ -0,0 +1,124 @@
"""Admin-side (write) page service.
The public site only has one editable page — "About" — so this
service is intentionally narrower than :class:`AdminPostsService`. The
slug is a fixed literal (``"about"``) and cannot be changed through
the admin. Only the title and body may be edited.
Every write:
- re-renders Markdown → sanitized HTML into ``body_html_cached`` so
the public read path stays a single SELECT.
- bumps ``updated_at``.
- emits an ``AuditService`` ``page_updated`` event.
- invalidates the public :class:`PageService` (and, defensively, the
:class:`PostService`) cache so the next request sees the new copy.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import structlog
from sqlalchemy import Engine, text
from app.models.entities import Page
from app.models.mappers import row_to_page
from app.services.audit import AuditService
from app.services.markdown import MarkdownService
from app.services.pages import PageService
from app.services.posts import PostService
_log = structlog.get_logger(__name__)
# The single editable page's slug. Hard-coded here (not injected) so
# the CLI contract is impossible to misuse — there is no way to point
# this service at a different slug.
ABOUT_SLUG: str = "about"
class AdminPagesService:
"""Write-side service for the About page."""
def __init__(
self,
engine: Engine,
markdown: MarkdownService,
page_service: PageService,
post_service: PostService,
audit: AuditService,
) -> None:
self._engine: Engine = engine
self._markdown: MarkdownService = markdown
self._page_service: PageService = page_service
self._post_service: PostService = post_service
self._audit: AuditService = audit
# ------------------------------------------------------------------
# Reads
# ------------------------------------------------------------------
def get_about(self) -> Optional[Page]:
"""Return the current About page row, or ``None`` if absent."""
with self._engine.connect() as conn:
row = conn.execute(
text(
"SELECT id, slug, title, body_md, body_html_cached,"
" updated_at, published"
" FROM pages WHERE slug = :slug LIMIT 1"
),
{"slug": ABOUT_SLUG},
).mappings().first()
return row_to_page(row) if row is not None else None
# ------------------------------------------------------------------
# Writes
# ------------------------------------------------------------------
def update_about(
self,
*,
title: str,
body_md: str,
actor_user_id: int,
) -> Optional[Page]:
"""Update the About page's title + body.
Slug is immutable — the admin form does not expose it.
"""
existing = self.get_about()
if existing is None:
return None
clean_title = (title or "").strip()
clean_body = body_md or ""
body_html = self._markdown.render(clean_body)
now_iso = datetime.now(timezone.utc).isoformat()
with self._engine.begin() as conn:
conn.execute(
text(
"UPDATE pages"
" SET title = :title, body_md = :body_md,"
" body_html_cached = :body_html,"
" updated_at = :updated_at"
" WHERE slug = :slug"
),
{
"title": clean_title,
"body_md": clean_body,
"body_html": body_html,
"updated_at": now_iso,
"slug": ABOUT_SLUG,
},
)
self._audit.record(
"page_updated",
user_id=actor_user_id,
detail={"slug": ABOUT_SLUG},
)
self._page_service.invalidate_all()
self._post_service.invalidate_all()
return self.get_about()

383
app/services/admin_posts.py Normal file
View File

@@ -0,0 +1,383 @@
"""Admin-side (write) post service.
Mirrors the shape of :class:`app.services.posts.PostService` but for
the admin CRUD path. Responsibilities:
- create / update / delete posts
- toggle publish state
- auto-generate unique slugs from titles on create (draft only)
- re-render Markdown to ``body_html_cached`` on every write
- audit every write via :class:`AuditService` using descriptive
``event_type`` strings
- invalidate both :class:`PostService` and :class:`PageService` caches
so the public site reflects the change immediately
All writes use parameterized SQL (``text(":bind")``). No user input is
ever interpolated into a query string.
The service treats ``author_user_id`` as an immutable field: once a
post is created, edits do NOT reassign authorship, even if a different
admin saves the edit. This matches the single-author ("Head Hen")
reality of the site.
Slug lock-on-publish
--------------------
A slug may only be auto-regenerated on title change while the post is
a draft. Once a post has been published even once, the slug is locked
server-side — callers cannot change it via the update path, even if
they later unpublish the post. This preserves any inbound links that
went live while the post was published.
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import structlog
from sqlalchemy import Engine, text
from app.models.entities import Post, PostStatus
from app.models.mappers import row_to_post
from app.services.audit import AuditService
from app.services.markdown import MarkdownService
from app.services.pages import PageService
from app.services.posts import PostService
from app.services.slugs import ensure_unique, slugify
_log = structlog.get_logger(__name__)
class AdminPostsService:
"""Write-side orchestration for blog posts.
Parameters
----------
engine:
Shared SQLAlchemy engine. Never opens its own.
markdown:
Shared :class:`MarkdownService` used to re-render on every
write so the public read path pays only a single SELECT.
post_service:
The public read-side service. Invalidated after every write so
the home page reflects the change immediately.
page_service:
Same rationale — a post edit doesn't change page content but
we conservatively invalidate to keep cache logic uniform.
audit:
:class:`AuditService` for descriptive admin write events.
"""
def __init__(
self,
engine: Engine,
markdown: MarkdownService,
post_service: PostService,
page_service: PageService,
audit: AuditService,
) -> None:
self._engine: Engine = engine
self._markdown: MarkdownService = markdown
self._post_service: PostService = post_service
self._page_service: PageService = page_service
self._audit: AuditService = audit
# ------------------------------------------------------------------
# Reads (admin dashboard)
# ------------------------------------------------------------------
def list_all(self) -> list[Post]:
"""Return every post, newest-updated-first.
Drafts and published posts are both included; the dashboard
surfaces the status column so Head Hen can work on unpublished
material.
"""
with self._engine.connect() as conn:
rows = (
conn.execute(
text(
"SELECT id, slug, title, body_md, body_html_cached,"
" status, published_at, updated_at, author_user_id"
" FROM posts"
" ORDER BY updated_at DESC, id DESC"
)
)
.mappings()
.all()
)
return [row_to_post(row) for row in rows]
def get_by_id(self, post_id: int) -> Optional[Post]:
"""Return the :class:`Post` for ``post_id`` or ``None`` if absent."""
with self._engine.connect() as conn:
row = conn.execute(
text(
"SELECT id, slug, title, body_md, body_html_cached,"
" status, published_at, updated_at, author_user_id"
" FROM posts WHERE id = :id LIMIT 1"
),
{"id": post_id},
).mappings().first()
return row_to_post(row) if row is not None else None
# ------------------------------------------------------------------
# Writes
# ------------------------------------------------------------------
def create(
self,
*,
title: str,
body_md: str,
status: PostStatus,
author_id: int,
) -> Post:
"""Insert a new post row and return the loaded :class:`Post`.
Flow
----
1. Slugify the title; ensure uniqueness via the closure over the
DB so concurrent creates cannot collide on the UNIQUE index.
2. Render Markdown to sanitized HTML.
3. If ``status == PUBLISHED`` stamp ``published_at = now``;
otherwise leave NULL.
4. Insert.
5. Audit ``post_created`` (and ``post_published`` when the
initial status is published).
6. Invalidate caches.
"""
clean_title = (title or "").strip()
clean_body = body_md or ""
base_slug = slugify(clean_title)
# The closure escapes the engine so ensure_unique can check
# without opening a long-lived transaction.
unique_slug = ensure_unique(base_slug, self._slug_exists)
body_html = self._markdown.render(clean_body)
now = datetime.now(timezone.utc)
now_iso = now.isoformat()
published_at_iso: Optional[str] = (
now_iso if status is PostStatus.PUBLISHED else None
)
with self._engine.begin() as conn:
result = conn.execute(
text(
"INSERT INTO posts"
" (slug, title, body_md, body_html_cached, status,"
" published_at, updated_at, author_user_id)"
" VALUES (:slug, :title, :body_md, :body_html,"
" :status, :published_at, :updated_at, :author_id)"
),
{
"slug": unique_slug,
"title": clean_title,
"body_md": clean_body,
"body_html": body_html,
"status": status.value,
"published_at": published_at_iso,
"updated_at": now_iso,
"author_id": author_id,
},
)
new_id = int(result.lastrowid) # type: ignore[arg-type]
row = conn.execute(
text(
"SELECT id, slug, title, body_md, body_html_cached,"
" status, published_at, updated_at, author_user_id"
" FROM posts WHERE id = :id"
),
{"id": new_id},
).mappings().first()
if row is None: # pragma: no cover — just inserted
raise RuntimeError("failed to reload just-inserted post row")
post = row_to_post(row)
self._audit.record(
"post_created",
user_id=author_id,
detail={"post_id": post.id, "slug": post.slug, "status": post.status.value},
)
if post.status is PostStatus.PUBLISHED:
self._audit.record(
"post_published",
user_id=author_id,
detail={"post_id": post.id, "slug": post.slug},
)
self._invalidate_caches()
return post
def update(
self,
post_id: int,
*,
title: str,
body_md: str,
actor_user_id: int,
) -> Optional[Post]:
"""Update a post's title + body. Return the refreshed :class:`Post`.
Behavior
--------
- The slug is NEVER regenerated by an update call. While the
post is still a draft the admin may delete + recreate to pick
a new slug; once published the slug is permanent per the
security contract (external links must not break).
- ``author_user_id`` is preserved — this endpoint does not
transfer authorship.
- ``published_at`` is preserved verbatim. Publishing happens via
:meth:`toggle_publish`.
- Always re-renders Markdown so ``body_html_cached`` stays in
sync with ``body_md``.
- Always bumps ``updated_at``.
"""
existing = self.get_by_id(post_id)
if existing is None:
return None
clean_title = (title or "").strip()
clean_body = body_md or ""
body_html = self._markdown.render(clean_body)
now_iso = datetime.now(timezone.utc).isoformat()
with self._engine.begin() as conn:
conn.execute(
text(
"UPDATE posts"
" SET title = :title, body_md = :body_md,"
" body_html_cached = :body_html,"
" updated_at = :updated_at"
" WHERE id = :id"
),
{
"title": clean_title,
"body_md": clean_body,
"body_html": body_html,
"updated_at": now_iso,
"id": post_id,
},
)
self._audit.record(
"post_updated",
user_id=actor_user_id,
detail={"post_id": post_id, "slug": existing.slug},
)
self._invalidate_caches()
return self.get_by_id(post_id)
def delete(self, post_id: int, *, actor_user_id: int) -> bool:
"""Delete a post row. Return True if something was deleted.
Media rows uploaded during drafting are NOT cleaned up here —
uploads aren't linked to posts in the schema, and orphan-sweep
is explicitly out of scope per the Phase 4 brief.
"""
existing = self.get_by_id(post_id)
if existing is None:
return False
with self._engine.begin() as conn:
conn.execute(
text("DELETE FROM posts WHERE id = :id"),
{"id": post_id},
)
self._audit.record(
"post_deleted",
user_id=actor_user_id,
detail={"post_id": post_id, "slug": existing.slug},
)
self._invalidate_caches()
return True
def toggle_publish(self, post_id: int, *, actor_user_id: int) -> Optional[Post]:
"""Flip draft ↔ published. Return the updated post, or ``None``.
Contract (see Phase 4 brief constraint 7):
- Draft → Published: set ``published_at = now`` ONLY if it was
previously NULL. If the post was once published, unpublished,
and is now being re-published we preserve the original
publish timestamp so the public list ordering stays stable.
- Published → Draft: status flips, ``published_at`` is preserved.
"""
existing = self.get_by_id(post_id)
if existing is None:
return None
now_iso = datetime.now(timezone.utc).isoformat()
if existing.status is PostStatus.PUBLISHED:
new_status = PostStatus.DRAFT
# Preserve existing published_at on unpublish. No event_type
# branch yet — we emit post_unpublished below.
published_at_iso: Optional[str] = (
existing.published_at.isoformat()
if existing.published_at is not None
else None
)
event_type = "post_unpublished"
else:
new_status = PostStatus.PUBLISHED
# First-publish stamp. Preserve any prior published_at so
# re-publish doesn't renumber the post on the front page.
if existing.published_at is None:
published_at_iso = now_iso
else:
published_at_iso = existing.published_at.isoformat()
event_type = "post_published"
with self._engine.begin() as conn:
conn.execute(
text(
"UPDATE posts"
" SET status = :status,"
" published_at = :published_at,"
" updated_at = :updated_at"
" WHERE id = :id"
),
{
"status": new_status.value,
"published_at": published_at_iso,
"updated_at": now_iso,
"id": post_id,
},
)
self._audit.record(
event_type,
user_id=actor_user_id,
detail={"post_id": post_id, "slug": existing.slug},
)
self._invalidate_caches()
return self.get_by_id(post_id)
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _slug_exists(self, candidate: str) -> bool:
"""Return True if a row with ``slug = candidate`` is already present."""
with self._engine.connect() as conn:
row = conn.execute(
text("SELECT 1 FROM posts WHERE slug = :s LIMIT 1"),
{"s": candidate},
).first()
return row is not None
def _invalidate_caches(self) -> None:
"""Drop both the post and page read-side caches.
Post invalidation is strictly required; page invalidation is
defensive — the schemas are separate, but keeping cache
invalidation uniform makes it obvious Phase 4 writes never
leave a stale public read.
"""
self._post_service.invalidate_all()
self._page_service.invalidate_all()
def get_admin_posts_service(request): # pragma: no cover — trivial
"""FastAPI dependency — pull the service off ``app.state``."""
return request.app.state.admin_posts_service

167
app/services/csrf.py Normal file
View File

@@ -0,0 +1,167 @@
"""CSRF double-submit cookie service.
Protects admin-write endpoints against cross-site request forgery by
requiring a signed token to be submitted BOTH as a cookie and as a
form field / header. An attacker can forge requests but cannot read
the cookie (SameSite=Lax blocks cross-site automatic cookie sending,
and even if the browser sent it, cross-site JS still cannot read
cookies on this origin). Matching the submitted value to the cookie
value then proves the request originated from our own pages.
Design
------
- The cookie stores a signed opaque nonce. Signing prevents a malicious
ad iframe (or any JS on a non-origin page) from producing a cookie
value that would later match a crafted form submission.
- The nonce itself is 256-bit (``secrets.token_urlsafe(32)``), generated
per-browser on first admin GET and reused for the session. Rotating
per request would invalidate any still-open admin tab on every nav,
which the small-scale admin UX cannot tolerate.
- Verification unsigns the submitted token and compares the raw nonce
to the raw nonce unsigned from the cookie using :func:`hmac.compare_digest`
(constant-time) to foreclose timing side channels.
- The cookie is ``HttpOnly=False`` so the minimal admin JS (live
preview, upload) can read it to set the ``X-CSRF-Token`` header on
fetch requests. This is the conventional double-submit cookie setup
— the XSS risk is already mitigated by the Markdown sanitizer and
the session cookie remains HttpOnly.
The service is a small collaborator: it does not know about FastAPI
routes, request objects, or templates. The :mod:`app.dependencies.csrf`
module wraps the verify call in a FastAPI dependency.
"""
from __future__ import annotations
import hmac
import secrets
from typing import Optional
import structlog
from itsdangerous import BadSignature, URLSafeTimedSerializer
_log = structlog.get_logger(__name__)
# Cookie name kept here as a module-level constant so routes,
# dependencies, and templates stay in sync.
CSRF_COOKIE_NAME: str = "cb_csrf"
# Default max age — matches the session TTL ceiling. A valid admin
# session already enforces the 30-day cap; the CSRF cookie merely
# piggybacks.
_DEFAULT_MAX_AGE_SEC: int = 30 * 86400
class CSRFService:
"""Issue and verify double-submit CSRF tokens.
Parameters
----------
signer:
Pre-built :class:`itsdangerous.URLSafeTimedSerializer`. The
caller is responsible for constructing it with
``salt="csrf"`` so a session-cookie token can never be
replayed as a CSRF token and vice-versa.
production:
When True, the issued cookie carries the ``Secure`` flag. Dev
(plain-HTTP 127.0.0.1) needs it off or the browser drops the
cookie entirely.
"""
def __init__(
self,
signer: URLSafeTimedSerializer,
*,
production: bool = False,
max_age_sec: int = _DEFAULT_MAX_AGE_SEC,
) -> None:
"""Store the signer and cookie-policy flags by reference."""
self._signer: URLSafeTimedSerializer = signer
self._production: bool = production
self._max_age_sec: int = int(max_age_sec)
# ------------------------------------------------------------------
# Issue
# ------------------------------------------------------------------
def issue(self, existing_cookie: Optional[str] = None) -> tuple[str, str]:
"""Return ``(token, cookie_value)`` — reuse or mint as appropriate.
If ``existing_cookie`` is a valid signed nonce (still within
TTL), we reuse the underlying nonce so the same token keeps
working across GET / POST cycles in the same admin session.
Otherwise we mint a fresh nonce.
The cookie value and the form/header token value are the SAME
signed string — this is the "double submit" contract. The
verify path re-signs nothing; it just compares the unsigned
raw nonces.
"""
raw = self._unsign_or_none(existing_cookie)
if raw is None:
raw = secrets.token_urlsafe(32)
signed = self._signer.dumps(raw)
# Token and cookie are both the signed string. Callers are free
# to submit either in a form field OR a header; verify accepts
# both shapes.
return signed, signed
# ------------------------------------------------------------------
# Verify
# ------------------------------------------------------------------
def verify(
self,
*,
cookie_value: Optional[str],
submitted: Optional[str],
) -> bool:
"""Return True iff cookie + submitted token unseal to the same nonce.
Both strings must unsign cleanly; a bad signature (tampered or
wrong-key) on either side fails closed. Constant-time compare
on the raw nonces prevents timing leaks of the nonce bytes.
"""
if not cookie_value or not submitted:
return False
cookie_raw = self._unsign_or_none(cookie_value)
submitted_raw = self._unsign_or_none(submitted)
if cookie_raw is None or submitted_raw is None:
return False
return hmac.compare_digest(cookie_raw, submitted_raw)
# ------------------------------------------------------------------
# Cookie helpers
# ------------------------------------------------------------------
def cookie_params(self) -> dict:
"""Return kwargs for ``response.set_cookie`` matching our CSRF policy.
Differences from :meth:`SessionService.cookie_params`:
- ``httponly=False`` so the admin JS can read it for fetch
requests.
- Same ``SameSite=Lax`` + ``Secure=<prod>`` otherwise.
"""
return {
"key": CSRF_COOKIE_NAME,
"httponly": False,
"samesite": "lax",
"secure": self._production,
"max_age": self._max_age_sec,
"path": "/",
}
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _unsign_or_none(self, value: Optional[str]) -> Optional[str]:
"""Return the raw nonce, or ``None`` on any signature failure.
Centralizes the "fail closed" contract; never raises to callers.
"""
if not value:
return None
try:
return self._signer.loads(value, max_age=self._max_age_sec)
except BadSignature:
_log.info("csrf_bad_signature")
return None

323
app/services/media.py Normal file
View File

@@ -0,0 +1,323 @@
"""Image upload pipeline: validate → re-encode → store → record.
Every admin image upload passes through this service. The contract is
strict on purpose — the site serves user-editable HTML (via the
sanitizer) plus the bytes that flow through here, so anything we miss
becomes XSS / RCE surface area.
Steps in :meth:`MediaService.save_upload`:
1. **Size cap** — reject anything over 8 MB at the bytes level
(before decoding). We read the full buffer so we can hash and
re-encode it; streaming would complicate Pillow's decode path and
upload volumes are tiny.
2. **Magic-byte check** — :mod:`python-magic` inspects the first
2048 bytes and yields a MIME type. Anything not in our allowlist
(``image/jpeg``, ``image/png``, ``image/webp``) is rejected.
Notably, ``image/gif`` is NOT allowed — animated GIFs have a long
history of ambiguous / abuse-friendly encodings.
3. **Pillow decode** — open via :func:`PIL.Image.open` on a
:class:`io.BytesIO` wrapper. Call ``.verify()`` on a dedicated copy
(it consumes the stream), then re-open for the actual encode path.
Reject anything larger than 10000 px per side as a defense against
decompression bombs.
4. **Re-encode to JPEG** — always JPEG. Strip metadata by reopening
into a clean :class:`PIL.Image.Image`; flatten alpha on a white
background so transparent PNG / WebP images don't render as black.
5. **Store** — write to ``<media_root>/<yyyy>/<mm>/<random>.jpg`` where
the random component is :func:`secrets.token_urlsafe(16)`. The
client-supplied filename is kept only in the DB row's
``original_filename`` for display; it is NEVER used to build a
filesystem path.
6. **DB row** — insert a :class:`Media` row. Return the loaded
dataclass.
"""
from __future__ import annotations
import io
import secrets
from datetime import datetime, timezone
from pathlib import Path
from typing import Final, Optional
import structlog
from PIL import Image, UnidentifiedImageError
from sqlalchemy import Engine, text
from app.models.entities import Media
from app.models.mappers import row_to_media
from app.services.audit import AuditService
_log = structlog.get_logger(__name__)
# Upper bound on the raw upload bytes. 8 MB matches the project
# security constraint; larger images are almost certainly a mistake
# for a brochure-site blog.
MAX_UPLOAD_BYTES: Final[int] = 8 * 1024 * 1024
# Maximum decoded dimension — reject any image wider or taller than
# this as a lightweight defense against decompression bombs.
MAX_PIXEL_DIMENSION: Final[int] = 10_000
# MIME types accepted from the magic-byte sniff. We always re-encode
# to JPEG regardless of input.
_ACCEPTED_MIME: Final[frozenset[str]] = frozenset(
{"image/jpeg", "image/png", "image/webp"}
)
# Output quality for Pillow's JPEG encoder. 85 is a widely-used
# sweet spot for photograph-like content.
_JPEG_QUALITY: Final[int] = 85
class MediaRejectedError(Exception):
"""Raised when an upload fails any validation step.
The message is user-facing (shown in the admin editor) — keep it
generic and free of implementation detail.
"""
class MediaService:
"""Validate and store admin-uploaded images.
Parameters
----------
engine:
Shared SQLAlchemy engine.
media_root:
Filesystem directory under which uploads live (the
``<yyyy>/<mm>/`` partition is appended). Relative paths are
resolved against the process cwd, matching how the FastAPI
StaticFiles mount is configured.
public_prefix:
URL prefix where the media root is mounted for public serving.
Defaults to ``/media`` so the Markdown that the admin inserts
after a drag-drop upload uses a path the public site can
actually reach.
audit:
:class:`AuditService` for the ``media_uploaded`` event.
"""
def __init__(
self,
engine: Engine,
media_root: str,
audit: AuditService,
*,
public_prefix: str = "/media",
) -> None:
self._engine: Engine = engine
self._media_root: Path = Path(media_root)
# Normalize to no trailing slash — we always join with "/<yyyy>/..."
self._public_prefix: str = "/" + public_prefix.strip("/")
self._audit: AuditService = audit
# ------------------------------------------------------------------
# save_upload
# ------------------------------------------------------------------
def save_upload(
self,
*,
original_filename: str,
data: bytes,
uploaded_by: int,
alt_text: str = "",
) -> Media:
"""Validate + re-encode + persist a new media upload.
Parameters
----------
original_filename:
The filename the client submitted. Stored in the DB row
for display only; NEVER used to build a filesystem path.
data:
Raw request body. Must be at most :data:`MAX_UPLOAD_BYTES`.
uploaded_by:
:class:`User` id of the authenticated admin performing the
upload.
alt_text:
Optional alt text. Empty is allowed — admin can set it
later by hand-editing the Markdown.
Returns
-------
Media
Fully-populated :class:`Media` dataclass.
Raises
------
MediaRejectedError
When any validation step fails (size, MIME, decode).
"""
# 1. Size cap — cheap, do first.
if len(data) == 0:
raise MediaRejectedError("Empty upload.")
if len(data) > MAX_UPLOAD_BYTES:
raise MediaRejectedError(
"Upload exceeds the 8 MB limit."
)
# 2. Magic-byte sniff.
sniffed_mime = _sniff_mime(data)
if sniffed_mime not in _ACCEPTED_MIME:
raise MediaRejectedError(
f"Unsupported image type ({sniffed_mime})."
)
# 3. Pillow verify on a fresh BytesIO (verify consumes the
# stream). If this raises we swallow and translate to a generic
# rejection so we never echo the Pillow error string back to
# the admin UI.
try:
Image.open(io.BytesIO(data)).verify()
except (UnidentifiedImageError, Exception): # noqa: BLE001
raise MediaRejectedError("Image could not be decoded.")
# 4. Re-open for the actual encode.
try:
image = Image.open(io.BytesIO(data))
# Load here so we catch truncated / corrupt payloads that
# verify() misses. Without load() the decode is lazy.
image.load()
except (UnidentifiedImageError, Exception): # noqa: BLE001
raise MediaRejectedError("Image could not be decoded.")
width, height = image.size
if width <= 0 or height <= 0:
raise MediaRejectedError("Image has zero dimension.")
if width > MAX_PIXEL_DIMENSION or height > MAX_PIXEL_DIMENSION:
raise MediaRejectedError(
"Image dimensions exceed the maximum allowed."
)
# Flatten transparency onto a white background when present.
# Pillow uses "RGBA", "LA", and "P" (palette, possibly with
# transparency) as modes that carry alpha-like semantics. We
# always convert to "RGB" before encoding as JPEG.
if image.mode in ("RGBA", "LA") or (
image.mode == "P" and "transparency" in image.info
):
# Convert through RGBA so alpha-compositing is well-defined,
# then flatten onto a white RGB background.
rgba = image.convert("RGBA")
background = Image.new("RGB", rgba.size, (255, 255, 255))
background.paste(rgba, mask=rgba.split()[-1])
image_out = background
elif image.mode != "RGB":
image_out = image.convert("RGB")
else:
image_out = image
# 5. Randomize the storage name and partition by month.
now = datetime.now(timezone.utc)
partition = f"{now:%Y}/{now:%m}"
random_name = f"{secrets.token_urlsafe(16)}.jpg"
target_dir = self._media_root / partition
target_dir.mkdir(parents=True, exist_ok=True)
target_path = target_dir / random_name
# Re-encode to JPEG with the metadata stripped (a fresh
# re-save removes any EXIF / color profile the source had).
image_out.save(
target_path,
format="JPEG",
quality=_JPEG_QUALITY,
optimize=True,
)
final_bytes = target_path.stat().st_size
stored_path = str(target_path)
# 6. DB row.
now_iso = now.isoformat()
with self._engine.begin() as conn:
result = conn.execute(
text(
"INSERT INTO media"
" (filename, original_filename, content_type,"
" size_bytes, stored_path, alt_text, uploaded_by,"
" uploaded_at)"
" VALUES (:filename, :original_filename, :content_type,"
" :size_bytes, :stored_path, :alt_text, :uploaded_by,"
" :uploaded_at)"
),
{
"filename": random_name,
"original_filename": original_filename or random_name,
"content_type": "image/jpeg",
"size_bytes": int(final_bytes),
"stored_path": stored_path,
"alt_text": alt_text or "",
"uploaded_by": int(uploaded_by),
"uploaded_at": now_iso,
},
)
new_id = int(result.lastrowid) # type: ignore[arg-type]
row = conn.execute(
text(
"SELECT id, filename, original_filename, content_type,"
" size_bytes, stored_path, alt_text, uploaded_by,"
" uploaded_at"
" FROM media WHERE id = :id"
),
{"id": new_id},
).mappings().first()
if row is None: # pragma: no cover — just inserted
raise RuntimeError("failed to reload just-inserted media row")
media = row_to_media(row)
self._audit.record(
"media_uploaded",
user_id=uploaded_by,
detail={
"media_id": media.id,
"filename": media.filename,
"size_bytes": media.size_bytes,
"original_mime": sniffed_mime,
},
)
return media
# ------------------------------------------------------------------
# URL helpers
# ------------------------------------------------------------------
def public_url(self, media: Media) -> str:
"""Return the URL the public site uses to fetch ``media``.
Built from the configured ``public_prefix`` + the partition
under ``media_root``. A stored path outside the media root
(should never happen — we always write under it) falls back
to the partition-less prefix to avoid leaking filesystem
paths.
"""
try:
rel = Path(media.stored_path).resolve().relative_to(
self._media_root.resolve()
)
except (ValueError, OSError):
return f"{self._public_prefix}/{media.filename}"
return f"{self._public_prefix}/{rel.as_posix()}"
def _sniff_mime(data: bytes) -> str:
"""Return the MIME type of ``data`` according to python-magic.
Wrapped so tests that monkeypatch can reach a single seam, and so
the import of :mod:`magic` stays local (the module has a
filesystem dependency on libmagic that should not block app
import).
"""
# Import is module-level normally; keep here to avoid any import
# order weirdness if libmagic is missing in exotic environments.
import magic
# First 2 KB is well beyond what any image header uses, and
# streaming beyond that buys nothing for MIME sniffing.
head = data[:2048]
return magic.from_buffer(head, mime=True)

106
app/services/slugs.py Normal file
View File

@@ -0,0 +1,106 @@
"""Slug helpers for posts (and, eventually, any other slug-keyed row).
A slug is the URL-safe identifier used in public post URLs. Keeping the
algorithm tiny, dependency-free, and in its own module makes it easy to
test in isolation and to reuse for the Phase 4 admin create/update
flow.
Rules applied by :func:`slugify`:
- lowercase the input
- replace every run of non-alphanumeric characters with a single ``-``
- collapse consecutive ``-`` runs
- strip leading and trailing ``-``
- never return an empty string — callers that pass empty / all-punctuation
input get a deterministic fallback (``"post"``) so they can still
build a valid URL.
:func:`ensure_unique` suffixes ``-2``, ``-3`` ... on collision, checking
the database row presence via a callable the caller supplies. Keeping
the DB access injectable keeps this module trivially testable.
"""
from __future__ import annotations
import re
from typing import Callable
# Single-pass regex collapses any run of non-alphanumeric characters
# into a single hyphen. Unicode letters are NOT preserved — the URL
# column is ASCII-safe by design, so exotic characters collapse away.
_NON_ALNUM_RE: re.Pattern[str] = re.compile(r"[^a-z0-9]+")
# Fallback slug when the user submits a title that slugifies to the
# empty string (e.g. only punctuation). Keeps write paths from crashing
# on pathological input while remaining human-readable in the URL.
_FALLBACK_SLUG: str = "post"
def slugify(title: str) -> str:
"""Return a URL-safe slug derived from ``title``.
Parameters
----------
title:
Human-authored title, typically from an admin form. Treated as
untrusted — no assumption about length or character set.
Returns
-------
str
A lowercased, hyphen-separated string containing only
``[a-z0-9-]`` with no leading or trailing hyphens. Never
empty; returns :data:`_FALLBACK_SLUG` if the input produced
an empty result after normalization.
"""
lowered = (title or "").lower()
collapsed = _NON_ALNUM_RE.sub("-", lowered).strip("-")
if not collapsed:
return _FALLBACK_SLUG
return collapsed
def ensure_unique(
base: str,
exists: Callable[[str], bool],
*,
max_attempts: int = 1000,
) -> str:
"""Return a slug not currently in use, suffixing ``-2`` / ``-3`` as needed.
Parameters
----------
base:
Starting slug — typically the output of :func:`slugify`.
exists:
Callable that returns ``True`` if the candidate slug is already
taken. The admin service passes a closure that hits the DB.
max_attempts:
Defensive bound on suffix-iteration so a degenerate ``exists``
callable can never spin forever. 1000 is wildly more than any
realistic collision rate.
Returns
-------
str
A slug ``exists`` returned ``False`` for. Raises
:class:`RuntimeError` in the pathological case where every
suffix is taken up to ``max_attempts``.
"""
if not exists(base):
return base
# Start at -2 because the bare slug is already taken. -1 would be
# reserved for the same row we're competing with, which is confusing
# in the DB.
for n in range(2, max_attempts + 1):
candidate = f"{base}-{n}"
if not exists(candidate):
return candidate
raise RuntimeError(
f"could not allocate a unique slug after {max_attempts} attempts"
f" (base={base!r})"
)