""" app/browser.py Singleton, lazily-loaded page fetcher + analysis orchestrator for SneakyScope. Responsibilities: - Fetch a URL (HTML, redirects, etc.) - Run the Suspicious Rules Engine (PASS/FAIL for all rules) - Write artifacts (screenshot.png, source.txt, results.json) into /data// - Return a single 'result' dict suitable for UI and future API Design notes: - Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules). - This module keeps "plumbing" only (fetch, extract, persist). - Minimal non-detection heuristics remain here (e.g., skip benign script MIME types). Assumptions: - Flask app context is active (uses current_app for logger and RULE_ENGINE). - SANDBOX_STORAGE is configured (default: /data). - enrich_url(url) returns enrichment dict. """ from __future__ import annotations import json import uuid from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import urlparse from bs4 import BeautifulSoup from flask import current_app from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError from app.utils.io_helpers import safe_write from app.utils.enrichment import enrich_url from app.utils.settings import get_settings from app.logging_setup import get_app_logger # Load settings once for constants / defaults settings = get_settings() logger = get_app_logger() class Browser: """ Orchestrates page fetching and analysis. Meant to be accessed via the lazily-loaded singleton factory `get_browser()`. """ def __init__(self, storage_dir: Optional[Path] = None) -> None: """ Args: storage_dir: Base directory for run artifacts. Defaults to settings.sandbox.storage (typically /data) if not provided. """ if storage_dir is None: try: # Prefer your settings model’s configured storage path storage_dir = Path(settings.sandbox.storage) except Exception: storage_dir = Path("/data") self.storage_dir: Path = storage_dir # ----------------------------------------------------------------------- # Engine access helpers # ----------------------------------------------------------------------- @staticmethod def _get_rule_engine(): """ Retrieve the rules engine instance from the Flask application config. Returns: RuleEngine or None: The engine if available, or None if not configured. """ try: return current_app.config.get("RULE_ENGINE") except Exception: return None @staticmethod def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]: """ Summarize a list of engine rule result dicts (result = "PASS"|"FAIL"). Returns: {'fail_count': int, 'total_rules': int} """ summary = {"fail_count": 0, "total_rules": 0} index = 0 total = len(results) while index < total: item = results[index] summary["total_rules"] = summary["total_rules"] + 1 if str(item.get("result", "")).upper() == "FAIL": summary["fail_count"] = summary["fail_count"] + 1 index = index + 1 return summary def run_rule_checks(self, text: str, category: str) -> Dict[str, Any]: """ Run all rules for a given category against provided text, returning a table-friendly model. Args: text: Text to analyze (HTML, snippet, etc.) category: One of 'form', 'script', 'text' (or any category your rules use) Returns: { "checks": [ { "name": str, "description": str, "category": str, "result": "PASS"|"FAIL", "reason": Optional[str], "severity": Optional[str], "tags": Optional[List[str]] }, ... ], "summary": { "fail_count": int, "total_rules": int } } """ out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}} engine = self._get_rule_engine() if engine is None: return out try: engine_results = engine.run_all(text, category=category) # list of dicts index = 0 total = len(engine_results) while index < total: item = engine_results[index] normalized = { "name": item.get("name"), "description": item.get("description"), "category": item.get("category"), "result": item.get("result"), # "PASS" | "FAIL" "reason": item.get("reason"), # present on FAIL by engine design "severity": item.get("severity"), "tags": item.get("tags"), } out["checks"].append(normalized) index = index + 1 out["summary"] = self._summarize_results(out["checks"]) except Exception as exc: # Preserve shape; record the error as a synthetic PASS (so UI doesn't break) out["checks"].append({ "name": "engine_error", "description": "Rule engine failed during evaluation", "category": category, "result": "PASS", "reason": f"{exc}", "severity": None, "tags": None }) out["summary"] = {"fail_count": 0, "total_rules": 1} return out def build_rule_checks_overview(self, full_html_text: str) -> List[Dict[str, Any]]: """ Build a top-level overview for the results page: runs each category across the entire HTML and groups results by category. Returns: [ {"category": "script", "results": [ ...engine dicts... ], "summary": {...}}, {"category": "form", "results": [ ... ], "summary": {...}}, {"category": "text", "results": [ ... ], "summary": {...}}, ] """ overview: List[Dict[str, Any]] = [] engine = self._get_rule_engine() categories = ["script", "form", "text"] index = 0 total = len(categories) while index < total: cat = categories[index] block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}} if engine is not None: try: results = engine.run_all(full_html_text, category=cat) block["results"] = results block["summary"] = self._summarize_results(results) except Exception as exc: block["results"] = [{ "name": "engine_error", "description": "Rule engine failed during overview evaluation", "category": cat, "result": "PASS", "reason": f"{exc}", "severity": None, "tags": None }] block["summary"] = {"fail_count": 0, "total_rules": 1} overview.append(block) index = index + 1 return overview # ----------------------------------------------------------------------- # Form & Script analysis (plumbing only; detection is in the rules engine) # ----------------------------------------------------------------------- def analyze_forms(self, html: str, base_url: str = "") -> List[Dict[str, Any]]: """ Collect form artifacts and evaluate per-form matches via the rules engine. Only include rows that matched at least one rule. Returns list of dicts with keys (per matched form): - type: "form" - action, method, inputs - content_snippet: str - rules: List[{name, description, severity?, tags?}] """ soup = BeautifulSoup(html, "lxml") results: List[Dict[str, Any]] = [] engine = self._get_rule_engine() base_hostname = urlparse(base_url).hostname or "" # Match how scripts picks preview len try: preview_len = getattr(settings.ui, "snippet_preview_len", 200) # keep parity with scripts except Exception: preview_len = 200 for form in soup.find_all("form"): try: action = (form.get("action") or "").strip() method = (form.get("method") or "get").strip().lower() inputs: List[Dict[str, Any]] = [] for inp in form.find_all("input"): inputs.append({ "name": inp.get("name"), "type": (inp.get("type") or "text").strip().lower(), }) # Use the actual form markup for regex rules form_markup = str(form) # UI-friendly snippet content_snippet = form_markup[:preview_len] matches: List[Dict[str, Any]] = [] if engine is not None: for r in getattr(engine, "rules", []): if getattr(r, "category", None) != "form": continue rtype = getattr(r, "rule_type", None) try: ok = False reason = "" if rtype == "regex": # Run against the raw form HTML ok, reason = r.run(form_markup) elif rtype == "function": # Structured facts for function-style rules facts = { "category": "form", "base_url": base_url, "base_hostname": base_hostname, "action": action, "action_hostname": urlparse(action).hostname or "", "method": method, "inputs": inputs, "markup": form_markup, } ok, reason = r.run(facts) else: continue if ok: matches.append({ "name": getattr(r, "name", "unknown_rule"), "description": (reason or "") or getattr(r, "description", ""), "severity": getattr(r, "severity", None), "tags": getattr(r, "tags", None), }) except Exception as rule_exc: # Be defensive—bad rule shouldn't break the form pass try: logger.debug("Form rule error", extra={"rule": getattr(r, "name", "?"), "error": str(rule_exc)}) except Exception: pass continue if matches: results.append({ "type": "form", "action": action, "method": method, "inputs": inputs, "content_snippet": content_snippet, "rules": matches, }) except Exception as exc: # Keep analysis resilient try: logger.error("Form analysis error", extra={"error": str(exc)}) except Exception: pass results.append({ "type": "form", "heuristics": [f"Form analysis error: {exc}"], }) return results def analyze_scripts(self, html: str, base_url: str = "") -> List[Dict[str, Any]]: """ Collect script artifacts and evaluate per-script matches via the rules engine. Only include rows that matched at least one rule. """ soup = BeautifulSoup(html, "lxml") results: List[Dict[str, Any]] = [] benign_types = {"application/ld+json", "application/json"} engine = self._get_rule_engine() base_hostname = urlparse(base_url).hostname or "" for script in soup.find_all("script"): try: src = (script.get("src") or "").strip() s_type_attr = (script.get("type") or "").strip().lower() inline_text = script.get_text(strip=True) or "" if s_type_attr in benign_types: continue record: Dict[str, Any] = {} if src: record["type"] = "external" record["src"] = src elif inline_text: # respect your UI snippet config preview_len = getattr(settings.ui, "snippet_preview_len", 200) record["type"] = "inline" record["content_snippet"] = (inline_text[:preview_len]).replace("\n", " ") else: record["type"] = "unknown" matches: List[Dict[str, Any]] = [] if engine is not None: if inline_text: for r in engine.rules: if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex": ok, reason = r.run(inline_text) if ok: matches.append({ "name": getattr(r, "name", "unknown_rule"), "description": getattr(r, "description", "") or (reason or ""), "severity": getattr(r, "severity", None), "tags": getattr(r, "tags", None), }) if src: facts = { "src": src, "base_url": base_url, "base_hostname": base_hostname, "src_hostname": urlparse(src).hostname or "", "category": "script", } for r in engine.rules: if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function": ok, reason = r.run(facts) if ok: matches.append({ "name": getattr(r, "name", "unknown_rule"), "description": (reason or "") or getattr(r, "description", ""), "severity": getattr(r, "severity", None), "tags": getattr(r, "tags", None), }) if matches: record["rules"] = matches results.append(record) except Exception as exc: results.append({ "type": "unknown", "heuristics": [f"Script analysis error: {exc}"] }) return results # ----------------------------------------------------------------------- # Fetcher / Orchestrator # ----------------------------------------------------------------------- async def fetch_page_artifacts(self, url: str, fetch_ssl_enabled:bool=False) -> Dict[str, Any]: """ Fetch page artifacts and save them in a UUID-based directory for this Browser's storage_dir. Writes: - /data//screenshot.png - /data//source.html - /data//results.json (single source of truth for routes) Returns: result dict with keys used by templates (and future API). """ run_uuid = str(uuid.uuid4()) run_dir = self.storage_dir / run_uuid run_dir.mkdir(parents=True, exist_ok=True) screenshot_path = run_dir / "screenshot.png" source_path = run_dir / "source.html" results_path = run_dir / "results.json" redirects: List[Dict[str, Any]] = [] downloads: List[Dict[str, Any]] = [] scripts_seen: List[str] = [] async with async_playwright() as pw: browser = await pw.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"] ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", java_script_enabled=True, locale="en-US" ) page = await context.new_page() # Event handlers (plumbing) def _on_response(resp): try: if 300 <= resp.status <= 399: redirects.append({"status": resp.status, "url": resp.url}) except Exception: pass def _on_download(d): try: downloads.append({"url": d.url, "suggested_filename": d.suggested_filename}) except Exception: pass def _on_request(r): try: if r.url.endswith((".js", ".vbs", ".hta")): scripts_seen.append(r.url) except Exception: pass page.on("response", _on_response) page.on("download", _on_download) page.on("request", _on_request) try: await page.goto(url, wait_until="networkidle", timeout=60000) final_url = page.url await page.screenshot(path=str(screenshot_path), full_page=True) html = await page.content() safe_write(source_path, html) except PWTimeoutError: final_url = page.url safe_write(source_path, "Page did not fully load (timeout)") await page.screenshot(path=str(screenshot_path), full_page=True) await context.close() await browser.close() # Read back saved source html_content = source_path.read_text(encoding="utf-8") # Forms analysis (per-form rule checks) forms_info = self.analyze_forms(html_content, final_url) # Scripts artifacts (no detection here) suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url) # Enrichment enrichment = enrich_url(url, fetch_ssl_enabled) # Global PASS/FAIL table per category (entire document) rule_checks_overview = self.build_rule_checks_overview(html_content) try: for blk in rule_checks_overview: current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}") except Exception: pass # Assemble single result dict result: Dict[str, Any] = { "uuid": run_uuid, "submitted_url": url, "final_url": final_url, "redirects": redirects, "downloads": downloads, "scripts": scripts_seen, "forms": forms_info, "suspicious_scripts": suspicious_scripts, "rule_checks": rule_checks_overview, # table-ready for UI "enrichment": enrichment } # Persist as the single source of truth for routes safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False)) try: logger.info(f"Saved results.json for run {run_uuid}") except Exception: pass return result # --------------------------------------------------------------------------- # Lazy-loaded singleton factory # --------------------------------------------------------------------------- # Prefer importing your project-wide singleton decorator. try: from app.utils.settings import singleton_loader # if we already export it except Exception: # Local fallback if import is not available. from typing import Callable, TypeVar import functools T = TypeVar("T") def singleton_loader(func: Callable[..., T]) -> Callable[..., T]: """Ensure the function only runs once, returning the cached value.""" cache: dict[str, T] = {} @functools.wraps(func) def wrapper(*args, **kwargs) -> T: if func.__name__ not in cache: cache[func.__name__] = func(*args, **kwargs) return cache[func.__name__] return wrapper @singleton_loader def get_browser(storage_dir: Optional[Path] = None) -> Browser: """ Lazily construct and cache a singleton Browser instance. Args: storage_dir: Optional override for artifact base directory. Returns: Browser: The singleton instance. """ return Browser(storage_dir=storage_dir)