diff --git a/app/__init__.py b/app/__init__.py index 7188aad..f59c5f2 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -19,7 +19,8 @@ from .rules.function_rules import ( form_action_missing, ) -from . import routes # blueprint +from app.blueprints import ui # ui blueprint +from app.blueprints import api # api blueprint # from .utils import io_helpers # if need logging/setup later # from .utils import cache_db # available for future injections @@ -136,7 +137,8 @@ def create_app() -> Flask: app.config["APP_VERSION"] = f"v{settings.app.version_major}.{settings.app.version_minor}" # Register blueprints - app.register_blueprint(routes.bp) + app.register_blueprint(ui.bp) + app.register_blueprint(api.api_bp) # Example log lines so we know we booted cleanly app.logger.info(f"SneakyScope started: {app.config['APP_NAME']} {app.config['APP_VERSION']}") diff --git a/app/blueprints/api.py b/app/blueprints/api.py new file mode 100644 index 0000000..ba2a2bb --- /dev/null +++ b/app/blueprints/api.py @@ -0,0 +1,212 @@ +# app/blueprints/api.py +""" +API blueprint for JSON endpoints. + +Endpoints: + POST /api/analyze_script + Body: + { + "job_id": "", # or "uuid": "" + "url": "https://cdn.example.com/app.js", + "category": "script" # optional, defaults to "script" + } + Response: + { + "ok": true, + "final_url": "...", + "status_code": 200, + "bytes": 12345, + "truncated": false, + "sha256": "...", + "artifact_path": "/abs/path/to//scripts/fetched/.js", + "findings": [ { "name": "...", "description": "...", "severity": "...", "tags": [...], "reason": "..." }, ... ], + "snippet": "", + "snippet_len": 45678 + } +""" + +import os +import time +from flask import Blueprint, request, jsonify, current_app, send_file, abort +from pathlib import Path + +from app.utils.settings import get_settings +from app.utils.external_fetcher import ExternalScriptFetcher +from werkzeug.exceptions import HTTPException + +api_bp = Blueprint("api", __name__, url_prefix="/api") + + +def _resolve_results_path(job_id: str) -> str: + """ + Compute the absolute results directory for a given job UUID. + Prefers /artifacts/, falls back to /. + """ + base_dir = "/data" + + candidate_with_artifacts = os.path.join(base_dir, job_id) + if os.path.isdir(candidate_with_artifacts): + return candidate_with_artifacts + + fallback = os.path.join(base_dir, job_id) + os.makedirs(fallback, exist_ok=True) + return fallback + + +def _make_snippet(text: str, max_chars: int = 1200) -> str: + """Produce a trimmed, safe-to-render snippet of the script contents.""" + if not text: + return "" + snippet = text.strip() + return (snippet[:max_chars] + "…") if len(snippet) > max_chars else snippet + +@api_bp.errorhandler(400) +@api_bp.errorhandler(403) +@api_bp.errorhandler(404) +@api_bp.errorhandler(405) +def _api_err(err): + """ + Return JSON for common client errors. + """ + if isinstance(err, HTTPException): + code = err.code + name = (err.name or "error").lower() + else: + code = 400 + name = "error" + return jsonify({"ok": False, "error": name}), code + + +@api_bp.errorhandler(500) +def _api_500(err): + """ + Return JSON for server errors and log the exception. + """ + try: + current_app.logger.exception("API 500") + except Exception: + pass + return jsonify({"ok": False, "error": "internal server error"}), 500 + + +@api_bp.post("/analyze_script") +def analyze_script(): + """ + Analyze EXACTLY one external script URL for a given job UUID. + + Expected JSON body: + { "job_id": "", "url": "https://cdn.example.com/app.js", "category": "script" } + """ + body = request.get_json(silent=True) or {} + + job_id_raw = body.get("job_id") or body.get("uuid") + script_url_raw = body.get("url") + category = (body.get("category") or "script").strip() or None # default to "script" + + job_id = (job_id_raw or "").strip() if isinstance(job_id_raw, str) else "" + script_url = (script_url_raw or "").strip() if isinstance(script_url_raw, str) else "" + + # log this request + current_app.logger.info(f"Got request to analyze {script_url} via API ") + + if not job_id or not script_url: + return jsonify({"ok": False, "error": "Missing job_id (or uuid) or url"}), 400 + + settings = get_settings() + + if not settings.external_fetch.enabled: + return jsonify({"ok": False, "error": "Feature disabled"}), 400 + + # Resolve the UUID-backed results directory for this run. + results_path = _resolve_results_path(job_id) + + # Initialize the fetcher; it reads its own settings internally. + fetcher = ExternalScriptFetcher(results_path=results_path) + + # Unique index for the saved file name: /scripts/fetched/.js + unique_index = int(time.time() * 1000) + + outcome = fetcher.fetch_one(script_url=script_url, index=unique_index) + if not outcome.ok or not outcome.saved_path: + return jsonify({ + "ok": False, + "error": outcome.reason, + "status_code": outcome.status_code, + "final_url": outcome.final_url + }), 502 + + # Read bytes and decode to UTF-8 for rules and snippet + try: + with open(outcome.saved_path, "rb") as fh: + js_text = fh.read().decode("utf-8", errors="ignore") + except Exception: + js_text = "" + + # Pull the rules engine from the app (prefer attribute, then config). + findings = [] + try: + engine = getattr(current_app, "rule_engine", None) + if engine is None: + engine = current_app.config.get("RULE_ENGINE") + except Exception: + engine = None + + if engine is not None and hasattr(engine, "run_all"): + try: + # run_all returns PASS/FAIL for each rule; we only surface FAIL (matched) to the UI + all_results = engine.run_all(js_text, category=category) + if isinstance(all_results, list): + matched = [] + for r in all_results: + try: + if (r.get("result") == "FAIL"): + matched.append({ + "name": r.get("name"), + "description": r.get("description"), + "severity": r.get("severity"), + "tags": r.get("tags") or [], + "reason": r.get("reason"), + "category": r.get("category"), + }) + except Exception: + # Ignore malformed entries + continue + findings = matched + except Exception as exc: + try: + current_app.logger.error("Rule engine error", extra={"error": str(exc)}) + except Exception: + pass + findings = [] + + snippet = _make_snippet(js_text, max_chars=settings.ui.snippet_preview_len) + + return jsonify({ + "ok": True, + "final_url": outcome.final_url, + "status_code": outcome.status_code, + "bytes": outcome.bytes_fetched, + "truncated": outcome.truncated, + "sha256": outcome.sha256_hex, + "artifact_path": outcome.saved_path, + "findings": findings, # only FAILed rules + "snippet": snippet, + "snippet_len": len(js_text) + }) + + +@api_bp.get("/artifacts//") +def get_artifact_raw(run_uuid, filename): + # prevent path traversal + if "/" in filename or ".." in filename: + abort(400) + + run_dir = _resolve_results_path(run_uuid) + full_path = Path(run_dir) / filename + + # if file is not there, give a 404 + if not os.path.isfile(full_path): + abort(404) + + # else return file + return send_file(full_path, as_attachment=False) \ No newline at end of file diff --git a/app/routes.py b/app/blueprints/ui.py similarity index 84% rename from app/routes.py rename to app/blueprints/ui.py index 69b580b..992ce3f 100644 --- a/app/routes.py +++ b/app/blueprints/ui.py @@ -1,3 +1,5 @@ +# app/blueprints/ui.py + import os import json import asyncio @@ -5,11 +7,10 @@ from pathlib import Path from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, send_file, abort -# from .browser import fetch_page_artifacts -from .utils.browser import get_browser -from .utils.enrichment import enrich_url -from .utils.settings import get_settings -from .utils.io_helpers import get_recent_results +from app.utils.browser import get_browser +from app.utils.enrichment import enrich_url +from app.utils.settings import get_settings +from app.utils.io_helpers import get_recent_results bp = Blueprint("main", __name__) @@ -34,9 +35,6 @@ def index(): The number of recent runs is controlled via settings.cache.recent_runs_count (int). Falls back to 10 if not present or invalid. """ - # Resolve SANDBOX_STORAGE from app config - storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() - # Pull recent count from settings with a safe fallback try: # settings is already initialized at module import in your file @@ -46,13 +44,15 @@ def index(): except Exception: recent_count = 10 + # Resolve SANDBOX_STORAGE from app config + storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() + # Build the recent list (non-fatal if storage is empty or unreadable) recent_results = get_recent_results(storage, recent_count, current_app.logger) # Pass to template; your index.html will hide the card if list is empty return render_template("index.html", recent_results=recent_results) - @bp.route("/analyze", methods=["POST"]) def analyze(): url = request.form.get("url", "").strip() @@ -60,7 +60,7 @@ def analyze(): if not url: flash("Please enter a URL.", "error") return redirect(url_for("main.index")) - + storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() storage.mkdir(parents=True, exist_ok=True) @@ -87,6 +87,7 @@ def analyze(): @bp.route("/results/", methods=["GET"]) def view_result(run_uuid: str): + # Resolve SANDBOX_STORAGE from app config storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() run_dir = storage / run_uuid results_path = run_dir / "results.json" @@ -105,6 +106,7 @@ def view_result(run_uuid: str): @bp.route("/artifacts//", methods=["GET"]) def artifacts(run_uuid: str, filename: str): + # Resolve SANDBOX_STORAGE from app config storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() run_dir = storage / run_uuid full_path = run_dir / filename @@ -123,3 +125,11 @@ def artifacts(run_uuid: str, filename: str): return send_file(full_path) +@bp.get("/view/artifact//") +def view_artifact(run_uuid, filename): + # Build a safe raw URL that streams the file (you said you already have this route) + raw_url = url_for('api.get_artifact_raw', run_uuid=run_uuid, filename=filename) + # Optional: derive language server-side if you prefer + language = None # e.g., 'javascript' + return render_template('viewer.html', filename=filename, raw_url=raw_url, language=language) + diff --git a/app/config/settings.yaml b/app/config/settings.yaml index 00f4aa2..05b58ef 100644 --- a/app/config/settings.yaml +++ b/app/config/settings.yaml @@ -8,5 +8,11 @@ cache: whois_cache_days: 7 geoip_cache_days: 7 +external_script_fetch: + enabled: True + max_total_mb: 5 + max_time_ms: 3000 + max_redirects: 3 + ui: snippet_preview_len: 300 diff --git a/app/rules/function_rules.py b/app/rules/function_rules.py index 54af20a..4e6baf8 100644 --- a/app/rules/function_rules.py +++ b/app/rules/function_rules.py @@ -22,6 +22,7 @@ from __future__ import annotations from typing import Any, Dict, Optional from urllib.parse import urlparse +_NOOP_ACTIONS = {"", "#", "javascript:void(0)", "javascript:void(0);"} # --------------------------------------------------------------------------- # Adapters @@ -169,35 +170,48 @@ def script_third_party_host(facts: Dict[str, Any]): # ---------------- Form rules ---------------- -def form_submits_to_different_host(facts: Dict[str, Any]): - """Flags
actions that submit to a different hostname than the page.""" - base_host = facts.get("base_hostname") or "" - action = facts.get("action") or "" - try: - action_host = urlparse(action).hostname - if action_host and base_host and action_host != base_host: - return True, "Form submits to a different host" - except Exception: - # Parsing failed; treat as no match rather than erroring out - pass +def form_action_missing(facts: Dict[str, Any]): + """Flags elements with no meaningful action attribute.""" + action = (facts.get("action") or "").strip() + if action in _NOOP_ACTIONS: + return True, "Form has no action attribute (or uses a no-op action)" return False, None def form_http_on_https_page(facts: Dict[str, Any]): """Flags forms submitting over HTTP while the page was loaded over HTTPS.""" - base_url = facts.get("base_url") or "" - action = facts.get("action") or "" + base_url = (facts.get("base_url") or "").strip() + action = (facts.get("action") or "").strip() + try: - if urlparse(base_url).scheme == "https" and urlparse(action).scheme == "http": - return True, "Form submits over insecure HTTP" + base_scheme = (urlparse(base_url).scheme or "").lower() + parsed_act = urlparse(action) + act_scheme = (parsed_act.scheme or "").lower() except Exception: - pass + return False, None # parsing trouble → don’t flag + + # Only flag absolute http:// actions on https pages. + # Relative or schemeless ('//host/...') isn’t flagged here (it won’t be HTTP on an HTTPS page). + if base_scheme == "https" and act_scheme == "http": + return True, f"Submits over insecure HTTP (action={parsed_act.geturl()})" return False, None -def form_action_missing(facts: Dict[str, Any]): - """Flags elements with no action attribute.""" - action = (facts.get("action") or "").strip() - if not action: - return True, "Form has no action attribute" - return False, None +def form_submits_to_different_host(facts: Dict[str, Any]): + """Flags actions that submit to a different hostname than the page.""" + base_host = (facts.get("base_hostname") or "").strip().lower() + action = (facts.get("action") or "").strip() + + if not action or action in _NOOP_ACTIONS: + return False, None + + try: + parsed = urlparse(action) + act_host = (parsed.hostname or "").lower() + except Exception: + return False, None + + # Only compare when the action specifies a host (absolute URL or schemeless //host/path). + if act_host and base_host and act_host != base_host: + return True, f"Submits to a different host ({act_host} vs {base_host})" + return False, None \ No newline at end of file diff --git a/app/static/style.css b/app/static/style.css index 75f54f3..2b4df62 100644 --- a/app/static/style.css +++ b/app/static/style.css @@ -279,6 +279,7 @@ details ul, details p { } } +/* SCRIPTS TABLE */ .scripts-table td ul { margin: 0.25rem 0 0.25rem 1rem; padding-left: 1rem; @@ -305,6 +306,59 @@ details ul, details p { white-space: nowrap; } + +/* lists & small text inside cells */ +.forms-table td ul { + margin: 0.25rem 0 0.25rem 1rem; + padding-left: 1rem; +} +.forms-table td small { + opacity: 0.85; +} + +/* keep the table from exploding */ +.forms-table { + table-layout: fixed; + width: 100%; +} + +/* columns: Action | Method | Inputs | Matches | Form Snippet */ +.forms-table th:nth-child(1) { width: 15rem; } /* Action */ +.forms-table th:nth-child(2) { width: 5rem; } /* Method */ +.forms-table th:nth-child(3) { width: 15rem; } /* Inputs */ +.forms-table th:nth-child(5) { width: 24rem; } /* Snippet */ +.forms-table th:nth-child(4) { width: auto; } /* Matches grows */ + +/* ellipsize cells by default */ +.forms-table td, +.forms-table th { + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +/* nicer wrapping inside snippet/details & input chips */ +.forms-table details { white-space: normal; } +.forms-table details > pre.code { + white-space: pre-wrap; /* let long lines wrap */ + max-height: 28rem; + overflow: auto; +} +.forms-table .chips { + display: flex; + gap: 0.25rem; + flex-wrap: wrap; + white-space: normal; /* allow chip text to wrap if needed */ +} + +/* (optional) responsive tweaks */ +@media (max-width: 1200px) { + .forms-table th:nth-child(1) { width: 22rem; } + .forms-table th:nth-child(3) { width: 16rem; } + .forms-table th:nth-child(5) { width: 18rem; } +} + + /* let URLs/snippets wrap *inside* their cell when expanded content shows */ .breakable { white-space: normal; diff --git a/app/templates/base.html b/app/templates/base.html index 79744e2..faa5870 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -30,4 +30,7 @@ {{ app_name }} - A self-hosted URL analysis sandbox - {{ app_version }} - \ No newline at end of file + + +{% block page_js %} +{% endblock %} \ No newline at end of file diff --git a/app/templates/index.html b/app/templates/index.html index f16f60b..db17f85 100644 --- a/app/templates/index.html +++ b/app/templates/index.html @@ -90,6 +90,9 @@ 100% { transform: rotate(360deg); } } +{% endblock %} + +{% block page_js %} + + +{% endblock %} \ No newline at end of file diff --git a/app/templates/viewer.html b/app/templates/viewer.html new file mode 100644 index 0000000..a18f3b2 --- /dev/null +++ b/app/templates/viewer.html @@ -0,0 +1,111 @@ +{% extends "base.html" %} +{% block content %} +
+
+
+

Code Viewer

+
+ File: {{ filename }} +
+
+
+ + + Open raw + Download +
+
+ +
+
+
+ + + + + +{% endblock %} diff --git a/app/utils/browser.py b/app/utils/browser.py index 2f52457..e79862d 100644 --- a/app/utils/browser.py +++ b/app/utils/browser.py @@ -33,7 +33,7 @@ from flask import current_app from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError from app.utils.io_helpers import safe_write -from app.enrichment import enrich_url +from app.utils.enrichment import enrich_url from app.utils.settings import get_settings # Load settings once for constants / defaults @@ -202,85 +202,111 @@ class Browser: # ----------------------------------------------------------------------- # Form & Script analysis (plumbing only; detection is in the rules engine) # ----------------------------------------------------------------------- - def analyze_forms(self, html: str, base_url: str) -> List[Dict[str, Any]]: + def analyze_forms(self, html: str, base_url: str = "") -> List[Dict[str, Any]]: """ - Parse forms from the page HTML and apply rule-based checks (engine), keeping - only simple plumbing heuristics here (no security logic). + Collect form artifacts and evaluate per-form matches via the rules engine. + Only include rows that matched at least one rule. - Returns list of dicts with keys: - - action, method, inputs - - flagged (bool), flag_reasons (list[str]), status (str) - - rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation) + Returns list of dicts with keys (per matched form): + - type: "form" + - action, method, inputs + - content_snippet: str + - rules: List[{name, description, severity?, tags?}] """ soup = BeautifulSoup(html, "lxml") - forms_info: List[Dict[str, Any]] = [] - page_hostname = urlparse(base_url).hostname + results: List[Dict[str, Any]] = [] + + engine = self._get_rule_engine() + base_hostname = urlparse(base_url).hostname or "" + # Match how scripts picks preview len + try: + preview_len = getattr(settings.ui, "snippet_preview_len", 200) # keep parity with scripts + except Exception: + preview_len = 200 for form in soup.find_all("form"): - action = form.get("action") - method = form.get("method", "get").lower() + try: + action = (form.get("action") or "").strip() + method = (form.get("method") or "get").strip().lower() - inputs: List[Dict[str, Any]] = [] - for inp in form.find_all("input"): - input_name = inp.get("name") - input_type = inp.get("type", "text") - inputs.append({"name": input_name, "type": input_type}) + inputs: List[Dict[str, Any]] = [] + for inp in form.find_all("input"): + inputs.append({ + "name": inp.get("name"), + "type": (inp.get("type") or "text").strip().lower(), + }) - flagged_reasons: List[str] = [] + # Use the actual form markup for regex rules + form_markup = str(form) + # UI-friendly snippet + content_snippet = form_markup[:preview_len] - if not action or str(action).strip() == "": - flagged_reasons.append("No action specified") - else: + matches: List[Dict[str, Any]] = [] + if engine is not None: + for r in getattr(engine, "rules", []): + if getattr(r, "category", None) != "form": + continue + rtype = getattr(r, "rule_type", None) + + try: + ok = False + reason = "" + if rtype == "regex": + # Run against the raw form HTML + ok, reason = r.run(form_markup) + elif rtype == "function": + # Structured facts for function-style rules + facts = { + "category": "form", + "base_url": base_url, + "base_hostname": base_hostname, + "action": action, + "action_hostname": urlparse(action).hostname or "", + "method": method, + "inputs": inputs, + "markup": form_markup, + } + ok, reason = r.run(facts) + else: + continue + + if ok: + matches.append({ + "name": getattr(r, "name", "unknown_rule"), + "description": (reason or "") or getattr(r, "description", ""), + "severity": getattr(r, "severity", None), + "tags": getattr(r, "tags", None), + }) + except Exception as rule_exc: + # Be defensive—bad rule shouldn't break the form pass + try: + self.logger.debug("Form rule error", extra={"rule": getattr(r, "name", "?"), "error": str(rule_exc)}) + except Exception: + pass + continue + + if matches: + results.append({ + "type": "form", + "action": action, + "method": method, + "inputs": inputs, + "content_snippet": content_snippet, + "rules": matches, + }) + + except Exception as exc: + # Keep analysis resilient try: - action_host = urlparse(action).hostname - if not str(action).startswith("/") and action_host != page_hostname: - flagged_reasons.append("Submits to a different host") + self.logger.error("Form analysis error", extra={"error": str(exc)}) except Exception: pass + results.append({ + "type": "form", + "heuristics": [f"Form analysis error: {exc}"], + }) - try: - if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https": - flagged_reasons.append("Submits over insecure HTTP") - except Exception: - pass - - for hidden in form.find_all("input", type="hidden"): - name_value = hidden.get("name") or "" - if "password" in name_value.lower(): - flagged_reasons.append("Hidden password field") - - flagged = bool(flagged_reasons) - - # Serialize a simple form snippet for rule category='form' - snippet_lines = [] - snippet_lines.append(f"base_url={base_url}") - snippet_lines.append(f"base_hostname={page_hostname}") - snippet_lines.append(f"action={action}") - snippet_lines.append(f"method={method}") - snippet_lines.append("inputs=") - - i = 0 - n = len(inputs) - while i < n: - item = inputs[i] - snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}") - i = i + 1 - form_snippet = "\n".join(snippet_lines) - - # Per-form rule checks (PASS/FAIL list via engine) - rule_checks = self.run_rule_checks(form_snippet, category="form") - - forms_info.append({ - "action": action, - "method": method, - "inputs": inputs, - "flagged": flagged, - "flag_reasons": flagged_reasons, - "status": "flagged" if flagged else "possibly safe", - "rule_checks": rule_checks - }) - - return forms_info + return results def analyze_scripts(self, html: str, base_url: str = "") -> List[Dict[str, Any]]: """ @@ -370,7 +396,7 @@ class Browser: Writes: - /data//screenshot.png - - /data//source.txt + - /data//source.html - /data//results.json (single source of truth for routes) Returns: @@ -381,7 +407,7 @@ class Browser: run_dir.mkdir(parents=True, exist_ok=True) screenshot_path = run_dir / "screenshot.png" - source_path = run_dir / "source.txt" + source_path = run_dir / "source.html" results_path = run_dir / "results.json" redirects: List[Dict[str, Any]] = [] diff --git a/app/utils/enrichment.py b/app/utils/enrichment.py index 4c88ef2..dea1378 100644 --- a/app/utils/enrichment.py +++ b/app/utils/enrichment.py @@ -9,8 +9,8 @@ from ipaddress import ip_address import socket # Local imports -from .utils.cache_db import get_cache -from .utils.settings import get_settings +from app.utils.cache_db import get_cache +from app.utils.settings import get_settings # Configure logging logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") @@ -39,9 +39,6 @@ def enrich_url(url: str) -> dict: # --- GeoIP --- result["geoip"] = enrich_geoip(hostname) - # --- BEC Words --- - result["bec_words"] = [w for w in BEC_WORDS if w.lower() in url.lower()] - return result diff --git a/app/utils/external_fetcher.py b/app/utils/external_fetcher.py new file mode 100644 index 0000000..81f79be --- /dev/null +++ b/app/utils/external_fetcher.py @@ -0,0 +1,338 @@ +# sneakyscope/app/utils/external_fetch.py +import hashlib +import os +import logging +from dataclasses import dataclass +from typing import Optional, Tuple, List +from urllib.parse import urljoin, urlparse + +import requests + +from app.utils.settings import get_settings + +settings = get_settings() + +_ALLOWED_SCHEMES = {"http", "https"} + + +@dataclass +class FetchResult: + """ + Outcome for a single external script fetch. + """ + ok: bool + reason: str + source_url: str + final_url: str + status_code: Optional[int] + content_type: Optional[str] + bytes_fetched: int + truncated: bool + sha256_hex: Optional[str] + saved_path: Optional[str] + + +class ExternalScriptFetcher: + """ + Minimal, safe-by-default fetcher for external JS files. + + Notes / assumptions: + - All artifacts for this run live under the UUID-backed `results_path` you pass in. + - Saves bytes to: /.js + - Manual redirects up to `max_redirects`. + - Streaming with a hard byte cap derived from `max_total_mb`. + - Never raises network exceptions to callers; failures are encoded in FetchResult. + - Settings are read from get_settings()['external_script_fetch'] with sane defaults. + """ + + def __init__(self, results_path: str, session: Optional[requests.Session] = None): + """ + Args: + results_path: Absolute path to the run's UUID directory (e.g., /data/). + session: Optional requests.Session to reuse connections; a new one is created if not provided. + """ + # Derived value: MiB -> bytes + self.max_total_bytes: int = settings.external_fetch.max_total_mb * 1024 * 1024 + + # Logger + self.logger = logging.getLogger(__file__) + + # Where to write artifacts for this job/run (UUID directory) + self.results_path = results_path + + # HTTP session with a predictable UA + self.session = session or requests.Session() + self.session.headers.update({"User-Agent": "SneakyScope/1.0"}) + + # ------------------------- + # Internal helper methods + # ------------------------- + + def _timeout(self) -> Tuple[float, float]: + """ + Compute (connect_timeout, read_timeout) in seconds from max_time_ms. + Keeps a conservative split so either phase gets a fair chance. + """ + total = max(0.1, settings.external_fetch.max_time_ms / 1000.0) + connect = min(1.5, total * 0.5) # cap connect timeout + read = max(0.5, total * 0.5) # floor read timeout + return (connect, read) + + def _scheme_allowed(self, url: str) -> bool: + """ + Return True if URL uses an allowed scheme (http/https). + """ + scheme = (urlparse(url).scheme or "").lower() + return scheme in _ALLOWED_SCHEMES + + def _artifact_path(self, index: int) -> str: + """ + Build an output path like: + /.js + + Ensures the directory exists. + """ + base_dir = os.path.join(self.results_path) + # Make sure parent directories exist (idempotent) + os.makedirs(base_dir, exist_ok=True) + filename = f"{index}.js" + return os.path.join(base_dir, filename) + + # ------------------------- + # Public API + # ------------------------- + + def fetch_one(self, script_url: str, index: int) -> FetchResult: + """ + Fetch exactly one external script with manual redirect handling and a hard per-file byte cap. + + Args: + script_url: The script URL to retrieve. + index: Numeric index used solely for naming the artifact file (.js). + + Returns: + FetchResult with status, metadata, and saved path (if successful). + """ + # Feature gate: allow callers to rely on a consistent failure when globally disabled. + if not settings.external_fetch.enabled: + return FetchResult( + ok=False, + reason="Feature disabled", + source_url=script_url, + final_url=script_url, + status_code=None, + content_type=None, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + + # Scheme guard: refuse anything not http/https in this v1. + if not self._scheme_allowed(script_url): + return FetchResult( + ok=False, + reason="Scheme not allowed", + source_url=script_url, + final_url=script_url, + status_code=None, + content_type=None, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + + current_url = script_url + status_code: Optional[int] = None + content_type: Optional[str] = None + redirects_followed = 0 + + # Manual redirect loop so we can enforce max_redirects precisely. + while True: + try: + resp = self.session.get( + current_url, + stream=True, + allow_redirects=False, + timeout=self._timeout(), + ) + except requests.exceptions.Timeout: + return FetchResult( + ok=False, + reason="Timeout", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + except requests.exceptions.RequestException as e: + return FetchResult( + ok=False, + reason=f"Network error: {e.__class__.__name__}", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + + status_code = resp.status_code + content_type = resp.headers.get("Content-Type") + + # Handle redirects explicitly (3xx with Location) + if status_code in (301, 302, 303, 307, 308) and "Location" in resp.headers: + if redirects_followed >= settings.external_fetch.max_redirects: + return FetchResult( + ok=False, + reason="Max redirects exceeded", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + next_url = urljoin(current_url, resp.headers["Location"]) + if not self._scheme_allowed(next_url): + return FetchResult( + ok=False, + reason="Redirect to disallowed scheme", + source_url=script_url, + final_url=next_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + current_url = next_url + redirects_followed += 1 + # Loop to follow next hop + continue + + # Not a redirect: stream response body with a hard byte cap. + cap = self.max_total_bytes + total = 0 + truncated = False + chunks: List[bytes] = [] + + try: + for chunk in resp.iter_content(chunk_size=8192): + if not chunk: + # Skip keep-alive chunks + continue + new_total = total + len(chunk) + if new_total > cap: + # Only keep what fits and stop + remaining = cap - total + if remaining > 0: + chunks.append(chunk[:remaining]) + total += remaining + truncated = True + break + chunks.append(chunk) + total = new_total + except requests.exceptions.Timeout: + return FetchResult( + ok=False, + reason="Timeout while reading", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=total, + truncated=truncated, + sha256_hex=None, + saved_path=None, + ) + except requests.exceptions.RequestException as e: + return FetchResult( + ok=False, + reason=f"Network error while reading: {e.__class__.__name__}", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=total, + truncated=truncated, + sha256_hex=None, + saved_path=None, + ) + + data = b"".join(chunks) + if not data: + return FetchResult( + ok=False, + reason="Empty response", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=0, + truncated=False, + sha256_hex=None, + saved_path=None, + ) + + # Persist to /.js + out_path = self._artifact_path(index) + try: + with open(out_path, "wb") as f: + f.write(data) + except OSError as e: + return FetchResult( + ok=False, + reason=f"Write error: {e.__class__.__name__}", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=total, + truncated=truncated, + sha256_hex=None, + saved_path=None, + ) + + sha256_hex = hashlib.sha256(data).hexdigest() + + # Structured log line for visibility/metrics + try: + self.logger.info( + "External script fetched", + extra={ + "source_url": script_url, + "final_url": current_url, + "status": status_code, + "bytes": total, + "truncated": truncated, + "sha256": sha256_hex, + "saved_path": out_path, + }, + ) + except Exception: + # Logging should never break the pipeline + pass + + return FetchResult( + ok=True, + reason="OK", + source_url=script_url, + final_url=current_url, + status_code=status_code, + content_type=content_type, + bytes_fetched=total, + truncated=truncated, + sha256_hex=sha256_hex, + saved_path=out_path, + ) diff --git a/app/utils/settings.py b/app/utils/settings.py index dd0cb73..0b28e70 100644 --- a/app/utils/settings.py +++ b/app/utils/settings.py @@ -39,6 +39,14 @@ BASE_DIR = Path(__file__).resolve().parent.parent DEFAULT_SETTINGS_FILE = BASE_DIR / "config" / "settings.yaml" # ---------- CONFIG DATA CLASSES ---------- +@dataclass +class External_FetchConfig: + enabled: bool = True + max_total_mb: int = 5 + max_time_ms: int = 3000 + max_redirects: int = 3 + concurrency: int = 3 + @dataclass class UIConfig: snippet_preview_len: int = 160 @@ -61,6 +69,7 @@ class AppConfig: class Settings: cache: Cache_Config = field(default_factory=Cache_Config) ui: UIConfig = field(default_factory=UIConfig) + external_fetch: External_FetchConfig = field(default_factory=External_FetchConfig) app: AppConfig = field(default_factory=AppConfig) @classmethod diff --git a/docs/roadmap.md b/docs/roadmap.md index c3c0366..1d7a86c 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -1,23 +1,18 @@ # SneakyScope — Roadmap (Updated 8-21-25) ## Priority 1 – Core Analysis / Stability - -* Opt-in “fetch external scripts” mode (off by default): on submission, download external script content (size/time limits) and run rules on fetched content. -* Remove remaining legacy form “flagged\_reasons” plumbing once all equivalent function rules are in place. -* Unit tests: YAML compilation, function-rule adapters, and per-script/per-form rule cases. * SSL/TLS intelligence: for HTTPS targets, pull certificate details from crt.sh (filtering expired); if a subdomain, also resolve the root domain to capture any wildcard certificates; probe the endpoint to enumerate supported TLS versions/ciphers and flag weak/legacy protocols. ## Priority 2 – API Layer * API endpoints: `/screenshot`, `/source`, `/analyse`. -* OpenAPI spec: create `openapi/openapi.yaml` and serve at `/api/openapi.yaml`. +* **OpenAPI**: add `POST /api/analyze_script` (request/response schemas, examples) to `openapi/openapi.yaml`; serve at `/api/openapi.yaml`. * Docs UI: Swagger UI or Redoc at `/docs`. +* (Nice-to-have) API JSON error consistency: handlers for 400/403/404/405/500 that always return JSON. ## Priority 3 – UI / UX * Front page/input handling: auto-prepend `http://`/`https://`/`www.` for bare domains. -* Source code viewer: embed page source in an editor view for readability. -* Scripts table: toggle between “Only suspicious” and “All scripts”. * Rules Lab (WYSIWYG tester): paste a rule, validate/compile, run against sample text; lightweight nav entry. ## Priority 4 – Artifact Management & Ops @@ -33,6 +28,6 @@ * Domain reputation (local feeds): build and refresh a consolidated domain/URL reputation store from URLHaus database dump and OpenPhish community dataset (scheduled pulls with dedup/normalize). * Threat intel connectors (settings-driven): add `settings.yaml` entries for VirusTotal and ThreatFox API keys (plus future providers); when present, enrich lookups and merge results into the unified reputation checks during analysis. -## Backlog / Far‑Off Plans +## Backlog / Far-Off Plans * Server profile scan: run a lightweight nmap service/banner scan on common web/alt ports (80, 443, 8000, 8080, 8443, etc.) and SSH; combine with server headers to infer stack (e.g., IIS vs. Linux/\*nix).