"""Tests for :class:`app.services.media.MediaService`. Uses real bytes produced by Pillow in-test so the magic-byte check exercises real image data. Does NOT mock the filesystem — writes go to a per-test temp dir so we can assert the stored JPEG is a valid JPEG after the re-encode pipeline. """ from __future__ import annotations import io from pathlib import Path import pytest from PIL import Image from sqlalchemy import Engine from app.services.audit import AuditService from app.services.media import ( MAX_UPLOAD_BYTES, MediaRejectedError, MediaService, ) # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def _jpeg_bytes(size: tuple[int, int] = (64, 48), color: str = "red") -> bytes: """Return valid JPEG bytes produced by Pillow.""" img = Image.new("RGB", size, color) buf = io.BytesIO() img.save(buf, format="JPEG", quality=85) return buf.getvalue() def _rgba_png_bytes(size: tuple[int, int] = (32, 32)) -> bytes: """Return valid RGBA-transparent PNG bytes.""" img = Image.new("RGBA", size, (0, 128, 0, 128)) buf = io.BytesIO() img.save(buf, format="PNG") return buf.getvalue() @pytest.fixture def media_root(tmp_path: Path) -> Path: """Return a per-test directory rooted under the pytest tmpdir.""" root = tmp_path / "media" root.mkdir() return root @pytest.fixture def service(db_engine: Engine, media_root: Path) -> MediaService: """Return a :class:`MediaService` wired to a real engine + temp dir.""" return MediaService( engine=db_engine, media_root=str(media_root), audit=AuditService(db_engine), ) # --------------------------------------------------------------------------- # happy path # --------------------------------------------------------------------------- def test_save_upload_stores_jpeg_and_returns_media( service: MediaService, media_root: Path ) -> None: """A valid JPEG is re-encoded and written under a random filename.""" data = _jpeg_bytes() media = service.save_upload( original_filename="cute-chick.jpg", data=data, uploaded_by=1, ) assert media.id > 0 assert media.content_type == "image/jpeg" assert media.filename.endswith(".jpg") assert media.original_filename == "cute-chick.jpg" assert media.size_bytes > 0 stored = Path(media.stored_path) assert stored.exists() assert stored.parent.parent.parent == media_root # Verify the written file is a real JPEG RGB image. with Image.open(stored) as img: assert img.format == "JPEG" assert img.mode == "RGB" def test_save_upload_assigns_random_filename_not_original( service: MediaService, ) -> None: """The stored filename is a random token — not the client-supplied name.""" data = _jpeg_bytes() media = service.save_upload( original_filename="secret.jpg", data=data, uploaded_by=1, ) assert media.filename != "secret.jpg" # Random component is 16 bytes url-safe (~22 chars) + ".jpg". name, _, ext = media.filename.rpartition(".") assert ext == "jpg" assert len(name) >= 16 def test_public_url_is_under_media_prefix(service: MediaService) -> None: """``public_url`` starts with ``/media/`` and a year partition.""" media = service.save_upload( original_filename="ok.jpg", data=_jpeg_bytes(), uploaded_by=1, ) url = service.public_url(media) assert url.startswith("/media/") assert url.endswith(media.filename) def test_save_upload_flattens_rgba_onto_white(service: MediaService) -> None: """An RGBA PNG upload is flattened and stored as RGB JPEG.""" data = _rgba_png_bytes() media = service.save_upload( original_filename="transparent.png", data=data, uploaded_by=1, ) stored = Path(media.stored_path) with Image.open(stored) as img: assert img.format == "JPEG" assert img.mode == "RGB" # --------------------------------------------------------------------------- # rejection paths # --------------------------------------------------------------------------- def test_reject_empty_upload(service: MediaService) -> None: """Empty uploads are rejected before any decode runs.""" with pytest.raises(MediaRejectedError): service.save_upload( original_filename="empty.jpg", data=b"", uploaded_by=1, ) def test_reject_non_image_bytes(service: MediaService) -> None: """A plain-text payload fails the magic-byte sniff.""" with pytest.raises(MediaRejectedError): service.save_upload( original_filename="totally.jpg", data=b"this is definitely not an image payload" * 10, uploaded_by=1, ) def test_reject_gif_upload(service: MediaService) -> None: """Animated-GIF hazard — GIFs are explicitly not on the allowlist.""" img = Image.new("RGB", (16, 16), "green") buf = io.BytesIO() img.save(buf, format="GIF") with pytest.raises(MediaRejectedError): service.save_upload( original_filename="animated.gif", data=buf.getvalue(), uploaded_by=1, ) def test_reject_oversize_upload(service: MediaService) -> None: """Payloads over the 8 MB cap are rejected.""" huge = b"\xff" * (MAX_UPLOAD_BYTES + 1) with pytest.raises(MediaRejectedError): service.save_upload( original_filename="huge.jpg", data=huge, uploaded_by=1, )