# sneakyscope/app/utils/external_fetch.py import hashlib import os import logging from dataclasses import dataclass from typing import Optional, Tuple, List from urllib.parse import urljoin, urlparse import requests from app.utils.settings import get_settings settings = get_settings() _ALLOWED_SCHEMES = {"http", "https"} @dataclass class FetchResult: """ Outcome for a single external script fetch. """ ok: bool reason: str source_url: str final_url: str status_code: Optional[int] content_type: Optional[str] bytes_fetched: int truncated: bool sha256_hex: Optional[str] saved_path: Optional[str] class ExternalScriptFetcher: """ Minimal, safe-by-default fetcher for external JS files. Notes / assumptions: - All artifacts for this run live under the UUID-backed `results_path` you pass in. - Saves bytes to: /.js - Manual redirects up to `max_redirects`. - Streaming with a hard byte cap derived from `max_total_mb`. - Never raises network exceptions to callers; failures are encoded in FetchResult. - Settings are read from get_settings()['external_script_fetch'] with sane defaults. """ def __init__(self, results_path: str, session: Optional[requests.Session] = None): """ Args: results_path: Absolute path to the run's UUID directory (e.g., /data/). session: Optional requests.Session to reuse connections; a new one is created if not provided. """ # Derived value: MiB -> bytes self.max_total_bytes: int = settings.external_fetch.max_total_mb * 1024 * 1024 # Logger self.logger = logging.getLogger(__file__) # Where to write artifacts for this job/run (UUID directory) self.results_path = results_path # HTTP session with a predictable UA self.session = session or requests.Session() self.session.headers.update({"User-Agent": "SneakyScope/1.0"}) # ------------------------- # Internal helper methods # ------------------------- def _timeout(self) -> Tuple[float, float]: """ Compute (connect_timeout, read_timeout) in seconds from max_time_ms. Keeps a conservative split so either phase gets a fair chance. """ total = max(0.1, settings.external_fetch.max_time_ms / 1000.0) connect = min(1.5, total * 0.5) # cap connect timeout read = max(0.5, total * 0.5) # floor read timeout return (connect, read) def _scheme_allowed(self, url: str) -> bool: """ Return True if URL uses an allowed scheme (http/https). """ scheme = (urlparse(url).scheme or "").lower() return scheme in _ALLOWED_SCHEMES def _artifact_path(self, index: int) -> str: """ Build an output path like: /.js Ensures the directory exists. """ base_dir = os.path.join(self.results_path) # Make sure parent directories exist (idempotent) os.makedirs(base_dir, exist_ok=True) filename = f"{index}.js" return os.path.join(base_dir, filename) # ------------------------- # Public API # ------------------------- def fetch_one(self, script_url: str, index: int) -> FetchResult: """ Fetch exactly one external script with manual redirect handling and a hard per-file byte cap. Args: script_url: The script URL to retrieve. index: Numeric index used solely for naming the artifact file (.js). Returns: FetchResult with status, metadata, and saved path (if successful). """ # Feature gate: allow callers to rely on a consistent failure when globally disabled. if not settings.external_fetch.enabled: return FetchResult( ok=False, reason="Feature disabled", source_url=script_url, final_url=script_url, status_code=None, content_type=None, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) # Scheme guard: refuse anything not http/https in this v1. if not self._scheme_allowed(script_url): return FetchResult( ok=False, reason="Scheme not allowed", source_url=script_url, final_url=script_url, status_code=None, content_type=None, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) current_url = script_url status_code: Optional[int] = None content_type: Optional[str] = None redirects_followed = 0 # Manual redirect loop so we can enforce max_redirects precisely. while True: try: resp = self.session.get( current_url, stream=True, allow_redirects=False, timeout=self._timeout(), ) except requests.exceptions.Timeout: return FetchResult( ok=False, reason="Timeout", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) except requests.exceptions.RequestException as e: return FetchResult( ok=False, reason=f"Network error: {e.__class__.__name__}", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) status_code = resp.status_code content_type = resp.headers.get("Content-Type") # Handle redirects explicitly (3xx with Location) if status_code in (301, 302, 303, 307, 308) and "Location" in resp.headers: if redirects_followed >= settings.external_fetch.max_redirects: return FetchResult( ok=False, reason="Max redirects exceeded", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) next_url = urljoin(current_url, resp.headers["Location"]) if not self._scheme_allowed(next_url): return FetchResult( ok=False, reason="Redirect to disallowed scheme", source_url=script_url, final_url=next_url, status_code=status_code, content_type=content_type, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) current_url = next_url redirects_followed += 1 # Loop to follow next hop continue # Not a redirect: stream response body with a hard byte cap. cap = self.max_total_bytes total = 0 truncated = False chunks: List[bytes] = [] try: for chunk in resp.iter_content(chunk_size=8192): if not chunk: # Skip keep-alive chunks continue new_total = total + len(chunk) if new_total > cap: # Only keep what fits and stop remaining = cap - total if remaining > 0: chunks.append(chunk[:remaining]) total += remaining truncated = True break chunks.append(chunk) total = new_total except requests.exceptions.Timeout: return FetchResult( ok=False, reason="Timeout while reading", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=total, truncated=truncated, sha256_hex=None, saved_path=None, ) except requests.exceptions.RequestException as e: return FetchResult( ok=False, reason=f"Network error while reading: {e.__class__.__name__}", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=total, truncated=truncated, sha256_hex=None, saved_path=None, ) data = b"".join(chunks) if not data: return FetchResult( ok=False, reason="Empty response", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=0, truncated=False, sha256_hex=None, saved_path=None, ) # Persist to /.js out_path = self._artifact_path(index) try: with open(out_path, "wb") as f: f.write(data) except OSError as e: return FetchResult( ok=False, reason=f"Write error: {e.__class__.__name__}", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=total, truncated=truncated, sha256_hex=None, saved_path=None, ) sha256_hex = hashlib.sha256(data).hexdigest() # Structured log line for visibility/metrics try: self.logger.info( "External script fetched", extra={ "source_url": script_url, "final_url": current_url, "status": status_code, "bytes": total, "truncated": truncated, "sha256": sha256_hex, "saved_path": out_path, }, ) except Exception: # Logging should never break the pipeline pass return FetchResult( ok=True, reason="OK", source_url=script_url, final_url=current_url, status_code=status_code, content_type=content_type, bytes_fetched=total, truncated=truncated, sha256_hex=sha256_hex, saved_path=out_path, )