diff --git a/app/browser.py b/app/browser.py deleted file mode 100644 index ae08f63..0000000 --- a/app/browser.py +++ /dev/null @@ -1,511 +0,0 @@ -""" -app/browser.py - -Page fetcher + analysis orchestrator for SneakyScope. -- Fetches a URL (HTML, redirects, etc.) -- Runs the Suspicious Rules Engine (PASS/FAIL for all rules) -- Writes artifacts (screenshot.png, source.txt, results.json) into /data// -- Returns a single 'result' dict suitable for UI and future API - -Design notes: -- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules). -- This module keeps "plumbing" only (fetch, extract, persist). -- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types). - -Assumptions: -- Flask app context is active (uses current_app for logger and RULE_ENGINE). -- SANDBOX_STORAGE is configured (default: /data). -- enrich_url(url) returns enrichment dict. -""" - -import json -import uuid -import re -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple -from urllib.parse import urlparse - -from bs4 import BeautifulSoup -from flask import current_app -from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError - -from app.utils.io_helpers import safe_write -from .enrichment import enrich_url - -from .utils.settings import get_settings - -settings = get_settings() - - -# --------------------------------------------------------------------------- -# Engine access helpers -# --------------------------------------------------------------------------- - -def get_rule_engine(): - """ - Retrieve the rules engine instance from the Flask application config. - - Returns: - RuleEngine or None: The engine if available, or None if not configured. - """ - try: - return current_app.config.get("RULE_ENGINE") - except Exception: - return None - - -def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]: - """ - Summarize a list of engine rule result dicts (result = "PASS"|"FAIL"). - - Returns: - {'fail_count': int, 'total_rules': int} - """ - summary = {"fail_count": 0, "total_rules": 0} - index = 0 - total = len(results) - while index < total: - item = results[index] - summary["total_rules"] = summary["total_rules"] + 1 - if str(item.get("result", "")).upper() == "FAIL": - summary["fail_count"] = summary["fail_count"] + 1 - index = index + 1 - return summary - - -def run_rule_checks(text: str, category: str) -> Dict[str, Any]: - """ - Run all rules for a given category against provided text, returning a table-friendly model. - - Args: - text: Text to analyze (HTML, snippet, etc.) - category: One of 'form', 'script', 'text' (or any category your rules use) - - Returns: - { - "checks": [ - { "name": str, "description": str, "category": str, - "result": "PASS"|"FAIL", "reason": Optional[str], - "severity": Optional[str], "tags": Optional[List[str]] }, ... - ], - "summary": { "fail_count": int, "total_rules": int } - } - """ - out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}} - engine = get_rule_engine() - - if engine is None: - return out - - try: - engine_results = engine.run_all(text, category=category) # list of dicts - # Normalize explicitly - index = 0 - total = len(engine_results) - while index < total: - item = engine_results[index] - normalized = { - "name": item.get("name"), - "description": item.get("description"), - "category": item.get("category"), - "result": item.get("result"), # "PASS" | "FAIL" - "reason": item.get("reason"), # present on FAIL by engine design - "severity": item.get("severity"), - "tags": item.get("tags"), - } - out["checks"].append(normalized) - index = index + 1 - - out["summary"] = _summarize_results(out["checks"]) - except Exception as exc: - # Preserve shape; record the error as a synthetic PASS (so UI doesn't break) - out["checks"].append({ - "name": "engine_error", - "description": "Rule engine failed during evaluation", - "category": category, - "result": "PASS", - "reason": f"{exc}", - "severity": None, - "tags": None - }) - out["summary"] = {"fail_count": 0, "total_rules": 1} - - return out - - -def build_rule_checks_overview(full_html_text: str) -> List[Dict[str, Any]]: - """ - Build a top-level overview for the results page: runs each category across - the entire HTML and groups results by category. - - Returns: - [ - {"category": "script", "results": [ ...engine dicts... ], "summary": {...}}, - {"category": "form", "results": [ ... ], "summary": {...}}, - {"category": "text", "results": [ ... ], "summary": {...}}, - ] - """ - overview: List[Dict[str, Any]] = [] - engine = get_rule_engine() - - categories = ["script", "form", "text"] - index = 0 - total = len(categories) - - while index < total: - cat = categories[index] - block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}} - - if engine is not None: - try: - results = engine.run_all(full_html_text, category=cat) - block["results"] = results - block["summary"] = _summarize_results(results) - except Exception as exc: - block["results"] = [{ - "name": "engine_error", - "description": "Rule engine failed during overview evaluation", - "category": cat, - "result": "PASS", - "reason": f"{exc}", - "severity": None, - "tags": None - }] - block["summary"] = {"fail_count": 0, "total_rules": 1} - - overview.append(block) - index = index + 1 - - return overview - - -# --------------------------------------------------------------------------- -# Form & Script analysis (plumbing only; detection is in the rules engine) -# --------------------------------------------------------------------------- - -def analyze_forms(html: str, base_url: str) -> List[Dict[str, Any]]: - """ - Parse forms from the page HTML and apply rule-based checks (engine), keeping - only simple plumbing heuristics here (no security logic). - - Returns list of dicts with keys: - - action, method, inputs - - flagged (bool), flag_reasons (list[str]), status (str) - - rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation) - - Note: - The 'flagged' value is now purely a legacy visual hint based on simple - heuristics; the authoritative PASS/FAIL details are in rule_checks. - As you migrate heuristics into function rules, this 'flagged' may be - removed entirely. - """ - soup = BeautifulSoup(html, "lxml") - forms_info: List[Dict[str, Any]] = [] - page_hostname = urlparse(base_url).hostname - - for form in soup.find_all("form"): - action = form.get("action") - method = form.get("method", "get").lower() - - # Build explicit inputs list - inputs: List[Dict[str, Any]] = [] - for inp in form.find_all("input"): - input_name = inp.get("name") - input_type = inp.get("type", "text") - inputs.append({"name": input_name, "type": input_type}) - - # Minimal legacy flags (kept for UI continuity; detection lives in engine) - flagged_reasons: List[str] = [] - - if not action or str(action).strip() == "": - flagged_reasons.append("No action specified") - else: - try: - action_host = urlparse(action).hostname - if not str(action).startswith("/") and action_host != page_hostname: - flagged_reasons.append("Submits to a different host") - except Exception: - pass - - try: - if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https": - flagged_reasons.append("Submits over insecure HTTP") - except Exception: - pass - - for hidden in form.find_all("input", type="hidden"): - name_value = hidden.get("name") or "" - if "password" in name_value.lower(): - flagged_reasons.append("Hidden password field") - - flagged = bool(flagged_reasons) - - # Serialize a simple form snippet for rule category='form' - snippet_lines = [] - snippet_lines.append(f"base_url={base_url}") - snippet_lines.append(f"base_hostname={page_hostname}") - snippet_lines.append(f"action={action}") - snippet_lines.append(f"method={method}") - snippet_lines.append("inputs=") - - i = 0 - n = len(inputs) - while i < n: - item = inputs[i] - snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}") - i = i + 1 - form_snippet = "\n".join(snippet_lines) - - # Per-form rule checks (PASS/FAIL list via engine) - rule_checks = run_rule_checks(form_snippet, category="form") - - forms_info.append({ - "action": action, - "method": method, - "inputs": inputs, - "flagged": flagged, - "flag_reasons": flagged_reasons, - "status": "flagged" if flagged else "possibly safe", - "rule_checks": rule_checks - }) - - return forms_info - - -def analyze_scripts(html: str, base_url: str = "") -> List[Dict[str, Any]]: - """ - Collect script artifacts and evaluate per-script matches via the rules engine. - Only include rows that matched at least one rule. Inline scripts are checked - against regex rules using their text; external scripts are checked against - function rules using a small 'facts' dict (src/hosts). - - Returns list of dicts like: - { - "type": "external" | "inline" | "unknown", - "src": "...", # for external - "content_snippet": "...", # for inline - "rules": [ { "name": "...", "description": "..." }, ... ] - } - """ - soup = BeautifulSoup(html, "lxml") - results: List[Dict[str, Any]] = [] - - # Benign MIME types we ignore entirely (non-detection plumbing) - benign_types = {"application/ld+json", "application/json"} - - engine = get_rule_engine() - base_hostname = urlparse(base_url).hostname or "" - - for script in soup.find_all("script"): - try: - src = (script.get("src") or "").strip() - s_type_attr = (script.get("type") or "").strip().lower() - inline_text = script.get_text(strip=True) or "" - - # Skip benign structured data outright (noise control) - if s_type_attr in benign_types: - continue - - record: Dict[str, Any] = {} - if src: - record["type"] = "external" - record["src"] = src - elif inline_text: - record["type"] = "inline" - record["content_snippet"] = (inline_text[:settings.ui.snippet_preview_len]).replace("\n", " ") - else: - record["type"] = "unknown" - - # --- Per-script evaluation: gather matches from engine rules - matches: List[Dict[str, str]] = [] - if engine is not None: - # Inline content → run regex script rules against the text - if inline_text: - for r in engine.rules: - if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex": - ok, reason = r.run(inline_text) - if ok: - matches.append({ - "name": getattr(r, "name", "unknown_rule"), - "description": getattr(r, "description", "") or (reason or ""), - "severity": getattr(r, "severity", None), - "tags": getattr(r, "tags", None), - }) - - # External src → run function script rules with facts - if src: - facts = { - "src": src, - "base_url": base_url, - "base_hostname": base_hostname, - "src_hostname": urlparse(src).hostname or "", - "category": "script", - } - for r in engine.rules: - if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function": - ok, reason = r.run(facts) - if ok: - matches.append({ - "name": getattr(r, "name", "unknown_rule"), - "description": (reason or "") or getattr(r, "description", ""), - "severity": getattr(r, "severity", None), - "tags": getattr(r, "tags", None), - }) - - # Only keep rows that matched at least one rule - if matches: - record["rules"] = matches - results.append(record) - - except Exception as exc: - results.append({ - "type": "unknown", - "heuristics": [f"Script analysis error: {exc}"] - }) - - return results - - - -# --------------------------------------------------------------------------- -# Fetcher / Orchestrator -# --------------------------------------------------------------------------- - -async def fetch_page_artifacts(url: str, storage_dir: Path) -> Dict[str, Any]: - """ - Fetch page artifacts and save them in a UUID-based directory. - - Writes: - - /data//screenshot.png - - /data//source.txt - - /data//results.json (single source of truth for routes) - - Returns: - result dict with keys used by templates (and future API). - """ - run_uuid = str(uuid.uuid4()) - run_dir = storage_dir / run_uuid - run_dir.mkdir(parents=True, exist_ok=True) - - screenshot_path = run_dir / "screenshot.png" - source_path = run_dir / "source.txt" - results_path = run_dir / "results.json" - - redirects: List[Dict[str, Any]] = [] - downloads: List[Dict[str, Any]] = [] - scripts_seen: List[str] = [] - - async with async_playwright() as pw: - browser = await pw.chromium.launch( - headless=True, - args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"] - ) - context = await browser.new_context( - viewport={"width": 1920, "height": 1080}, - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", - java_script_enabled=True, - locale="en-US" - ) - page = await context.new_page() - - # Event handlers (plumbing) - def _on_response(resp): - try: - if 300 <= resp.status <= 399: - redirects.append({"status": resp.status, "url": resp.url}) - except Exception: - pass - - def _on_download(d): - try: - downloads.append({"url": d.url, "suggested_filename": d.suggested_filename}) - except Exception: - pass - - def _on_request(r): - try: - if r.url.endswith((".js", ".vbs", ".hta")): - scripts_seen.append(r.url) - except Exception: - pass - - page.on("response", _on_response) - page.on("download", _on_download) - page.on("request", _on_request) - - try: - await page.goto(url, wait_until="networkidle", timeout=60000) - final_url = page.url - await page.screenshot(path=str(screenshot_path), full_page=True) - html = await page.content() - safe_write(source_path, html) - except PWTimeoutError: - final_url = page.url - safe_write(source_path, "Page did not fully load (timeout)") - await page.screenshot(path=str(screenshot_path), full_page=True) - - await context.close() - await browser.close() - - # Read back saved source - html_content = source_path.read_text(encoding="utf-8") - - # Forms analysis (per-form rule checks) - forms_info = analyze_forms(html_content, final_url) - - # Scripts artifacts (no detection here) - suspicious_scripts = analyze_scripts(html_content, base_url=final_url) - - # Enrichment - enrichment = enrich_url(url) - - # Global PASS/FAIL table per category (entire document) - rule_checks_overview = build_rule_checks_overview(html_content) - - for blk in rule_checks_overview: - current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}") - - - # Assemble single result dict - result: Dict[str, Any] = { - "uuid": run_uuid, - "submitted_url": url, - "final_url": final_url, - "redirects": redirects, - "downloads": downloads, - "scripts": scripts_seen, - "forms": forms_info, - "suspicious_scripts": suspicious_scripts, - "rule_checks": rule_checks_overview, # table-ready for UI - "enrichment": enrichment - } - - # Persist as the single source of truth for routes - safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False)) - - try: - current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}") - except Exception: - pass - - return result - - -def load_results(storage_dir: Path, run_uuid: str) -> Optional[Dict[str, Any]]: - """ - Load a prior run's results.json from /data//. - - Returns: - dict or None - """ - run_dir = storage_dir / run_uuid - results_path = run_dir / "results.json" - if not results_path.exists(): - return None - - try: - text = results_path.read_text(encoding="utf-8") - data = json.loads(text) - return data - except Exception: - return None diff --git a/app/config/bec_words.yaml b/app/config/bec_words.yaml deleted file mode 100644 index e40941f..0000000 --- a/app/config/bec_words.yaml +++ /dev/null @@ -1,5 +0,0 @@ -words: - - "reset password" - - "open document" - - "view document" - - "verify account" diff --git a/app/routes.py b/app/routes.py index 85f1593..69b580b 100644 --- a/app/routes.py +++ b/app/routes.py @@ -5,8 +5,9 @@ from pathlib import Path from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, send_file, abort -from .browser import fetch_page_artifacts -from .enrichment import enrich_url +# from .browser import fetch_page_artifacts +from .utils.browser import get_browser +from .utils.enrichment import enrich_url from .utils.settings import get_settings from .utils.io_helpers import get_recent_results @@ -64,9 +65,8 @@ def analyze(): storage.mkdir(parents=True, exist_ok=True) try: - engine = current_app.config.get("RULE_ENGINE") - result = asyncio.run(fetch_page_artifacts(url, storage)) - # result = asyncio.run(fetch_page_artifacts(url, storage)) + browser = get_browser() + result = asyncio.run(browser.fetch_page_artifacts(url)) current_app.logger.info(f"[+] Analysis done for {url}") except Exception as e: flash(f"Analysis failed: {e}", "error") diff --git a/app/utils/browser.py b/app/utils/browser.py new file mode 100644 index 0000000..2f52457 --- /dev/null +++ b/app/utils/browser.py @@ -0,0 +1,522 @@ +""" +app/browser.py + +Singleton, lazily-loaded page fetcher + analysis orchestrator for SneakyScope. + +Responsibilities: +- Fetch a URL (HTML, redirects, etc.) +- Run the Suspicious Rules Engine (PASS/FAIL for all rules) +- Write artifacts (screenshot.png, source.txt, results.json) into /data// +- Return a single 'result' dict suitable for UI and future API + +Design notes: +- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules). +- This module keeps "plumbing" only (fetch, extract, persist). +- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types). + +Assumptions: +- Flask app context is active (uses current_app for logger and RULE_ENGINE). +- SANDBOX_STORAGE is configured (default: /data). +- enrich_url(url) returns enrichment dict. +""" + +from __future__ import annotations + +import json +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse + +from bs4 import BeautifulSoup +from flask import current_app +from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError + +from app.utils.io_helpers import safe_write +from app.enrichment import enrich_url +from app.utils.settings import get_settings + +# Load settings once for constants / defaults +settings = get_settings() + + +class Browser: + """ + Orchestrates page fetching and analysis. Meant to be accessed via the + lazily-loaded singleton factory `get_browser()`. + """ + + def __init__(self, storage_dir: Optional[Path] = None) -> None: + """ + Args: + storage_dir: Base directory for run artifacts. Defaults to settings.sandbox.storage + (typically /data) if not provided. + """ + if storage_dir is None: + try: + # Prefer your settings model’s configured storage path + storage_dir = Path(settings.sandbox.storage) + except Exception: + storage_dir = Path("/data") + + self.storage_dir: Path = storage_dir + + # ----------------------------------------------------------------------- + # Engine access helpers + # ----------------------------------------------------------------------- + @staticmethod + def _get_rule_engine(): + """ + Retrieve the rules engine instance from the Flask application config. + + Returns: + RuleEngine or None: The engine if available, or None if not configured. + """ + try: + return current_app.config.get("RULE_ENGINE") + except Exception: + return None + + @staticmethod + def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Summarize a list of engine rule result dicts (result = "PASS"|"FAIL"). + + Returns: + {'fail_count': int, 'total_rules': int} + """ + summary = {"fail_count": 0, "total_rules": 0} + index = 0 + total = len(results) + while index < total: + item = results[index] + summary["total_rules"] = summary["total_rules"] + 1 + if str(item.get("result", "")).upper() == "FAIL": + summary["fail_count"] = summary["fail_count"] + 1 + index = index + 1 + return summary + + def run_rule_checks(self, text: str, category: str) -> Dict[str, Any]: + """ + Run all rules for a given category against provided text, returning a table-friendly model. + + Args: + text: Text to analyze (HTML, snippet, etc.) + category: One of 'form', 'script', 'text' (or any category your rules use) + + Returns: + { + "checks": [ + { "name": str, "description": str, "category": str, + "result": "PASS"|"FAIL", "reason": Optional[str], + "severity": Optional[str], "tags": Optional[List[str]] }, ... + ], + "summary": { "fail_count": int, "total_rules": int } + } + """ + out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}} + engine = self._get_rule_engine() + + if engine is None: + return out + + try: + engine_results = engine.run_all(text, category=category) # list of dicts + index = 0 + total = len(engine_results) + while index < total: + item = engine_results[index] + normalized = { + "name": item.get("name"), + "description": item.get("description"), + "category": item.get("category"), + "result": item.get("result"), # "PASS" | "FAIL" + "reason": item.get("reason"), # present on FAIL by engine design + "severity": item.get("severity"), + "tags": item.get("tags"), + } + out["checks"].append(normalized) + index = index + 1 + + out["summary"] = self._summarize_results(out["checks"]) + except Exception as exc: + # Preserve shape; record the error as a synthetic PASS (so UI doesn't break) + out["checks"].append({ + "name": "engine_error", + "description": "Rule engine failed during evaluation", + "category": category, + "result": "PASS", + "reason": f"{exc}", + "severity": None, + "tags": None + }) + out["summary"] = {"fail_count": 0, "total_rules": 1} + + return out + + def build_rule_checks_overview(self, full_html_text: str) -> List[Dict[str, Any]]: + """ + Build a top-level overview for the results page: runs each category across + the entire HTML and groups results by category. + + Returns: + [ + {"category": "script", "results": [ ...engine dicts... ], "summary": {...}}, + {"category": "form", "results": [ ... ], "summary": {...}}, + {"category": "text", "results": [ ... ], "summary": {...}}, + ] + """ + overview: List[Dict[str, Any]] = [] + engine = self._get_rule_engine() + + categories = ["script", "form", "text"] + index = 0 + total = len(categories) + + while index < total: + cat = categories[index] + block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}} + + if engine is not None: + try: + results = engine.run_all(full_html_text, category=cat) + block["results"] = results + block["summary"] = self._summarize_results(results) + except Exception as exc: + block["results"] = [{ + "name": "engine_error", + "description": "Rule engine failed during overview evaluation", + "category": cat, + "result": "PASS", + "reason": f"{exc}", + "severity": None, + "tags": None + }] + block["summary"] = {"fail_count": 0, "total_rules": 1} + + overview.append(block) + index = index + 1 + + return overview + + # ----------------------------------------------------------------------- + # Form & Script analysis (plumbing only; detection is in the rules engine) + # ----------------------------------------------------------------------- + def analyze_forms(self, html: str, base_url: str) -> List[Dict[str, Any]]: + """ + Parse forms from the page HTML and apply rule-based checks (engine), keeping + only simple plumbing heuristics here (no security logic). + + Returns list of dicts with keys: + - action, method, inputs + - flagged (bool), flag_reasons (list[str]), status (str) + - rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation) + """ + soup = BeautifulSoup(html, "lxml") + forms_info: List[Dict[str, Any]] = [] + page_hostname = urlparse(base_url).hostname + + for form in soup.find_all("form"): + action = form.get("action") + method = form.get("method", "get").lower() + + inputs: List[Dict[str, Any]] = [] + for inp in form.find_all("input"): + input_name = inp.get("name") + input_type = inp.get("type", "text") + inputs.append({"name": input_name, "type": input_type}) + + flagged_reasons: List[str] = [] + + if not action or str(action).strip() == "": + flagged_reasons.append("No action specified") + else: + try: + action_host = urlparse(action).hostname + if not str(action).startswith("/") and action_host != page_hostname: + flagged_reasons.append("Submits to a different host") + except Exception: + pass + + try: + if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https": + flagged_reasons.append("Submits over insecure HTTP") + except Exception: + pass + + for hidden in form.find_all("input", type="hidden"): + name_value = hidden.get("name") or "" + if "password" in name_value.lower(): + flagged_reasons.append("Hidden password field") + + flagged = bool(flagged_reasons) + + # Serialize a simple form snippet for rule category='form' + snippet_lines = [] + snippet_lines.append(f"base_url={base_url}") + snippet_lines.append(f"base_hostname={page_hostname}") + snippet_lines.append(f"action={action}") + snippet_lines.append(f"method={method}") + snippet_lines.append("inputs=") + + i = 0 + n = len(inputs) + while i < n: + item = inputs[i] + snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}") + i = i + 1 + form_snippet = "\n".join(snippet_lines) + + # Per-form rule checks (PASS/FAIL list via engine) + rule_checks = self.run_rule_checks(form_snippet, category="form") + + forms_info.append({ + "action": action, + "method": method, + "inputs": inputs, + "flagged": flagged, + "flag_reasons": flagged_reasons, + "status": "flagged" if flagged else "possibly safe", + "rule_checks": rule_checks + }) + + return forms_info + + def analyze_scripts(self, html: str, base_url: str = "") -> List[Dict[str, Any]]: + """ + Collect script artifacts and evaluate per-script matches via the rules engine. + Only include rows that matched at least one rule. + """ + soup = BeautifulSoup(html, "lxml") + results: List[Dict[str, Any]] = [] + + benign_types = {"application/ld+json", "application/json"} + + engine = self._get_rule_engine() + base_hostname = urlparse(base_url).hostname or "" + + for script in soup.find_all("script"): + try: + src = (script.get("src") or "").strip() + s_type_attr = (script.get("type") or "").strip().lower() + inline_text = script.get_text(strip=True) or "" + + if s_type_attr in benign_types: + continue + + record: Dict[str, Any] = {} + if src: + record["type"] = "external" + record["src"] = src + elif inline_text: + # respect your UI snippet config + preview_len = getattr(settings.ui, "snippet_preview_len", 200) + record["type"] = "inline" + record["content_snippet"] = (inline_text[:preview_len]).replace("\n", " ") + else: + record["type"] = "unknown" + + matches: List[Dict[str, Any]] = [] + if engine is not None: + if inline_text: + for r in engine.rules: + if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex": + ok, reason = r.run(inline_text) + if ok: + matches.append({ + "name": getattr(r, "name", "unknown_rule"), + "description": getattr(r, "description", "") or (reason or ""), + "severity": getattr(r, "severity", None), + "tags": getattr(r, "tags", None), + }) + + if src: + facts = { + "src": src, + "base_url": base_url, + "base_hostname": base_hostname, + "src_hostname": urlparse(src).hostname or "", + "category": "script", + } + for r in engine.rules: + if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function": + ok, reason = r.run(facts) + if ok: + matches.append({ + "name": getattr(r, "name", "unknown_rule"), + "description": (reason or "") or getattr(r, "description", ""), + "severity": getattr(r, "severity", None), + "tags": getattr(r, "tags", None), + }) + + if matches: + record["rules"] = matches + results.append(record) + + except Exception as exc: + results.append({ + "type": "unknown", + "heuristics": [f"Script analysis error: {exc}"] + }) + + return results + + # ----------------------------------------------------------------------- + # Fetcher / Orchestrator + # ----------------------------------------------------------------------- + async def fetch_page_artifacts(self, url: str) -> Dict[str, Any]: + """ + Fetch page artifacts and save them in a UUID-based directory for this Browser's storage_dir. + + Writes: + - /data//screenshot.png + - /data//source.txt + - /data//results.json (single source of truth for routes) + + Returns: + result dict with keys used by templates (and future API). + """ + run_uuid = str(uuid.uuid4()) + run_dir = self.storage_dir / run_uuid + run_dir.mkdir(parents=True, exist_ok=True) + + screenshot_path = run_dir / "screenshot.png" + source_path = run_dir / "source.txt" + results_path = run_dir / "results.json" + + redirects: List[Dict[str, Any]] = [] + downloads: List[Dict[str, Any]] = [] + scripts_seen: List[str] = [] + + async with async_playwright() as pw: + browser = await pw.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"] + ) + context = await browser.new_context( + viewport={"width": 1920, "height": 1080}, + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", + java_script_enabled=True, + locale="en-US" + ) + page = await context.new_page() + + # Event handlers (plumbing) + def _on_response(resp): + try: + if 300 <= resp.status <= 399: + redirects.append({"status": resp.status, "url": resp.url}) + except Exception: + pass + + def _on_download(d): + try: + downloads.append({"url": d.url, "suggested_filename": d.suggested_filename}) + except Exception: + pass + + def _on_request(r): + try: + if r.url.endswith((".js", ".vbs", ".hta")): + scripts_seen.append(r.url) + except Exception: + pass + + page.on("response", _on_response) + page.on("download", _on_download) + page.on("request", _on_request) + + try: + await page.goto(url, wait_until="networkidle", timeout=60000) + final_url = page.url + await page.screenshot(path=str(screenshot_path), full_page=True) + html = await page.content() + safe_write(source_path, html) + except PWTimeoutError: + final_url = page.url + safe_write(source_path, "Page did not fully load (timeout)") + await page.screenshot(path=str(screenshot_path), full_page=True) + + await context.close() + await browser.close() + + # Read back saved source + html_content = source_path.read_text(encoding="utf-8") + + # Forms analysis (per-form rule checks) + forms_info = self.analyze_forms(html_content, final_url) + + # Scripts artifacts (no detection here) + suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url) + + # Enrichment + enrichment = enrich_url(url) + + # Global PASS/FAIL table per category (entire document) + rule_checks_overview = self.build_rule_checks_overview(html_content) + + try: + for blk in rule_checks_overview: + current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}") + except Exception: + pass + + # Assemble single result dict + result: Dict[str, Any] = { + "uuid": run_uuid, + "submitted_url": url, + "final_url": final_url, + "redirects": redirects, + "downloads": downloads, + "scripts": scripts_seen, + "forms": forms_info, + "suspicious_scripts": suspicious_scripts, + "rule_checks": rule_checks_overview, # table-ready for UI + "enrichment": enrichment + } + + # Persist as the single source of truth for routes + safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False)) + + try: + current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}") + except Exception: + pass + + return result + +# --------------------------------------------------------------------------- +# Lazy-loaded singleton factory +# --------------------------------------------------------------------------- + +# Prefer importing your project-wide singleton decorator. +try: + from app.utils.settings import singleton_loader # if we already export it +except Exception: + # Local fallback if import is not available. + from typing import Callable, TypeVar + import functools + T = TypeVar("T") + def singleton_loader(func: Callable[..., T]) -> Callable[..., T]: + """Ensure the function only runs once, returning the cached value.""" + cache: dict[str, T] = {} + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + if func.__name__ not in cache: + cache[func.__name__] = func(*args, **kwargs) + return cache[func.__name__] + return wrapper + + +@singleton_loader +def get_browser(storage_dir: Optional[Path] = None) -> Browser: + """ + Lazily construct and cache a singleton Browser instance. + + Args: + storage_dir: Optional override for artifact base directory. + + Returns: + Browser: The singleton instance. + """ + return Browser(storage_dir=storage_dir) diff --git a/app/enrichment.py b/app/utils/enrichment.py similarity index 93% rename from app/enrichment.py rename to app/utils/enrichment.py index 61bcc3b..4c88ef2 100644 --- a/app/enrichment.py +++ b/app/utils/enrichment.py @@ -19,14 +19,6 @@ logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") cache = get_cache("/data/cache.db") settings = get_settings() -# Load BEC words -BEC_WORDS_FILE = Path(__file__).parent.parent / "config" / "bec_words.yaml" -if BEC_WORDS_FILE.exists(): - with open(BEC_WORDS_FILE, "r", encoding="utf-8") as f: - BEC_WORDS = yaml.safe_load(f).get("words", []) -else: - BEC_WORDS = [] - # 24 hours * 60 minutes days = 24 * 60