From 1eb2a52f178ded40688773286b3ac2d9421a6a17 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Wed, 20 Aug 2025 21:33:30 -0500 Subject: [PATCH] feat(engine,ui): unify detection in rules engine, add function rules & per-script matches; improve scripts table UX MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Core changes - Centralize detection in the Rules Engine; browser.py now focuses on fetch/extract/persist. - Add class-based adapters: - FactAdapter: converts snippets → structured facts. - FunctionRuleAdapter: wraps dict-based rule functions for engine input (str or dict). - Register function rules (code-based) alongside YAML rules: - form_action_missing - form_http_on_https_page - form_submits_to_different_host - script_src_uses_data_or_blob - script_src_has_dangerous_extension - script_third_party_host Rules & YAML - Expand/normalize YAML rules with severities + tags; tighten patterns. - Add new regex rules: new_function_usage, unescape_usage, string_timer_usage, long_hex_constants. - Move iframe rule to `text` category. - Keep existing script/form/text rules; all compile under IGNORECASE. Browser / analysis refactor - browser.py: - Remove inline heuristics; rely on engine for PASS/FAIL, reason, severity, tags. - Build page-level overview (`rule_checks`) across categories. - Analyze forms: add `base_url` + `base_hostname` to snippet so function rules can evaluate; include per-form rule_checks. - Analyze scripts: **per-script evaluation**: - Inline -> run regex script rules on inline text. - External -> run function script rules with a facts dict (src/src_hostname/base_url/base_hostname). - Only include scripts that matched ≥1 rule; attach severity/tags to matches. - Persist single source of truth: `/data//results.json`. - Backward-compat: `fetch_page_artifacts(..., engine=...)` kwarg accepted/ignored. UI/UX - Suspicious Scripts table now shows only matched scripts. - Add severity badges and tag chips; tooltips show rule description. - Prevent table blowouts: - Fixed layout + ellipsis + wrapping helpers (`.scripts-table`, `.breakable`, `details pre.code`). - Shortened inline snippet preview (configurable). - Minor template niceties (e.g., rel="noopener" on external links where applicable). Config - Add `ui.snippet_preview_len` to settings.yaml; default 160. - Load into `app.config["SNIPPET_PREVIEW_LEN"]` and use in `analyze_scripts`. Init / wiring - Import and register function rules as `Rule(...)` objects (not dicts). - Hook Rules Engine to Flask logger for verbose/diagnostic output. - Log totals on startup; keep YAML path override via `SNEAKYSCOPE_RULES_FILE`. Bug fixes - Fix boot crash: pass `Rule` instances to `engine.add_rule()` instead of dicts. - Fix “N/A” in scripts table by actually computing per-script matches. - Ensure form rules fire by including `base_url`/`base_hostname` in form snippets. Roadmap - Update roadmap to reflect completed items: - “Show each check and whether it triggered (pass/fail list per rule)” - Severity levels + tags in Suspicious Scripts - Results.json as route source of truth - Scripts table UX (badges, tooltips, layout fix) --- app/__init__.py | 130 +++++-- app/browser.py | 529 ++++++++++++++++---------- app/config/settings.yaml | 5 +- app/config/suspicious_rules.yaml | 98 ++++- app/routes.py | 2 +- app/rules/function_rules.py | 203 ++++++++++ app/static/style.css | 64 ++++ app/templates/result.html | 40 +- app/utils/rules_engine.py | 323 ++++++++++++---- app/utils/settings.py | 5 + docs/Feature Session Plan Template.md | 31 ++ docs/README.md | 6 + docs/roadmap.md | 83 ++-- docs/workflow.md | 12 + 14 files changed, 1108 insertions(+), 423 deletions(-) create mode 100644 app/rules/function_rules.py create mode 100644 docs/Feature Session Plan Template.md create mode 100644 docs/README.md create mode 100644 docs/workflow.md diff --git a/app/__init__.py b/app/__init__.py index 1d63dbf..7188aad 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,16 +1,3 @@ -""" -app/__init__.py - -Application factory and startup hooks for SneakyScope. - -Responsibilities: -- Create the Flask app. -- Load settings (YAML -> dataclasses) with safe defaults. -- Initialize and load the Suspicious Rules Engine from YAML. -- Register blueprints (routes). -- Configure core paths (e.g., SANDBOX_STORAGE). -""" - import os import logging from pathlib import Path @@ -18,11 +5,24 @@ from flask import Flask # Local imports from .utils.settings import get_settings -from .utils import io_helpers # if you need logging/setup later -from .utils import cache_db # available for future injections -from .utils.rules_engine import RuleEngine, load_rules_from_yaml # rules engine +from .utils.rules_engine import RuleEngine, load_rules_from_yaml, Rule + +# our code based rules +from .rules.function_rules import ( + FactAdapter, + FunctionRuleAdapter, + script_src_uses_data_or_blob, + script_src_has_dangerous_extension, + script_third_party_host, + form_submits_to_different_host, + form_http_on_https_page, + form_action_missing, +) + from . import routes # blueprint +# from .utils import io_helpers # if need logging/setup later +# from .utils import cache_db # available for future injections def create_app() -> Flask: """ @@ -37,46 +37,110 @@ def create_app() -> Flask: # Load settings (safe fallback to defaults if file missing) settings = get_settings() - # Secret key loaded from env + # Secret key loaded from env (warn if missing) app.secret_key = os.getenv("SECRET_KEY") + if not app.secret_key: + app.logger.warning("[init] SECRET_KEY is not set; sessions may be insecure in production.") # Configure storage directory (bind-mount is still handled by sandbox.sh) sandbox_storage_default = Path("/data") app.config["SANDBOX_STORAGE"] = str(sandbox_storage_default) - # Initialize Suspicious Rules Engine at startup - # Determine rules file path relative to this package - base_dir = Path(__file__).resolve().parent - rules_path = base_dir / "config" / "suspicious_rules.yaml" + # --------------------------- + # Suspicious Rules Engine + # --------------------------- - # Create an engine instance (even if file missing, we still want an engine) - engine = RuleEngine() + # Determine rules file path relative to this package (allow env override) + base_dir = Path(__file__).resolve().parent + default_rules_path = base_dir / "config" / "suspicious_rules.yaml" + rules_path_str = os.getenv("SNEAKYSCOPE_RULES_FILE", str(default_rules_path)) + rules_path = Path(rules_path_str) + + # Create engine bound to Flask logger so all verbose/debug goes to app.logger + engine = RuleEngine(rules=[], logger=app.logger) # Try to load from YAML if present; log clearly if not if rules_path.exists(): try: - loaded_rules = load_rules_from_yaml(rules_path) - # Add rules one-by-one (explicit) - for rule in loaded_rules: - engine.add_rule(rule) - app.logger.info(f"[+] Loaded {len(loaded_rules)} suspicious rules from {rules_path}") + loaded_rules = load_rules_from_yaml(rules_path, logger=app.logger) + # Add rules one-by-one (explicit, clearer logs if any rule fails to compile) + index = 0 + total = len(loaded_rules) + while index < total: + engine.add_rule(loaded_rules[index]) + index = index + 1 + app.logger.info(f"[init] Loaded {len(loaded_rules)} suspicious rules from {rules_path}") except Exception as e: - app.logger.warning(f"[!] Failed loading rules from {rules_path}: {e}") + app.logger.warning(f"[init] Failed loading rules from {rules_path}: {e}") else: - app.logger.warning(f"[!] Rules file not found at {rules_path}. Engine will start with zero rules.") + app.logger.warning(f"[init] Rules file not found at {rules_path}. Engine will start with zero rules.") - # Store engine on app config so it is accessible via current_app + # Built-in function-based rules + adapter = FactAdapter(logger=app.logger) + + engine.add_rule(Rule( + name="form_action_missing", + description="Form has no action attribute", + category="form", + rule_type="function", + function=FunctionRuleAdapter(form_action_missing, category="form", adapter=adapter), + )) + + engine.add_rule(Rule( + name="form_http_on_https_page", + description="Form submits via HTTP from HTTPS page", + category="form", + rule_type="function", + function=FunctionRuleAdapter(form_http_on_https_page, category="form", adapter=adapter), + )) + + engine.add_rule(Rule( + name="form_submits_to_different_host", + description="Form submits to a different host", + category="form", + rule_type="function", + function=FunctionRuleAdapter(form_submits_to_different_host, category="form", adapter=adapter), + )) + + # Script rules expect dict 'facts' (you’ll wire per-script facts later) + engine.add_rule(Rule( + name="script_src_uses_data_or_blob", + description="Script src uses data:/blob: URL", + category="script", + rule_type="function", + function=FunctionRuleAdapter(script_src_uses_data_or_blob, category="script", adapter=adapter), + )) + + engine.add_rule(Rule( + name="script_src_has_dangerous_extension", + description="External script with dangerous extension", + category="script", + rule_type="function", + function=FunctionRuleAdapter(script_src_has_dangerous_extension, category="script", adapter=adapter), + )) + + engine.add_rule(Rule( + name="script_third_party_host", + description="Script is from a third-party host", + category="script", + rule_type="function", + function=FunctionRuleAdapter(script_third_party_host, category="script", adapter=adapter), + )) + + # Store engine both ways: attribute (convenient) and config + app.rule_engine = engine app.config["RULE_ENGINE"] = engine - # Make app name/version available for templates here if you want it globally + # App metadata available to templates app.config["APP_NAME"] = settings.app.name app.config["APP_VERSION"] = f"v{settings.app.version_major}.{settings.app.version_minor}" # Register blueprints app.register_blueprint(routes.bp) - # Example log line so we know we booted cleanly + # Example log lines so we know we booted cleanly app.logger.info(f"SneakyScope started: {app.config['APP_NAME']} {app.config['APP_VERSION']}") app.logger.info(f"SANDBOX_STORAGE: {app.config['SANDBOX_STORAGE']}") + app.logger.info(f"Registered {len(engine.rules)} total rules (YAML + function)") return app diff --git a/app/browser.py b/app/browser.py index 4858f9b..ae08f63 100644 --- a/app/browser.py +++ b/app/browser.py @@ -1,18 +1,46 @@ -import re -import uuid -import json -from pathlib import Path -from bs4 import BeautifulSoup -from datetime import datetime -from urllib.parse import urlparse -from typing import Dict, Any, Optional -from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError +""" +app/browser.py -from flask import current_app # access the rule engine from app config +Page fetcher + analysis orchestrator for SneakyScope. +- Fetches a URL (HTML, redirects, etc.) +- Runs the Suspicious Rules Engine (PASS/FAIL for all rules) +- Writes artifacts (screenshot.png, source.txt, results.json) into /data// +- Returns a single 'result' dict suitable for UI and future API + +Design notes: +- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules). +- This module keeps "plumbing" only (fetch, extract, persist). +- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types). + +Assumptions: +- Flask app context is active (uses current_app for logger and RULE_ENGINE). +- SANDBOX_STORAGE is configured (default: /data). +- enrich_url(url) returns enrichment dict. +""" + +import json +import uuid +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +from bs4 import BeautifulSoup +from flask import current_app +from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError from app.utils.io_helpers import safe_write from .enrichment import enrich_url +from .utils.settings import get_settings + +settings = get_settings() + + +# --------------------------------------------------------------------------- +# Engine access helpers +# --------------------------------------------------------------------------- + def get_rule_engine(): """ Retrieve the rules engine instance from the Flask application config. @@ -21,96 +49,158 @@ def get_rule_engine(): RuleEngine or None: The engine if available, or None if not configured. """ try: - # current_app is only available during an active request context - engine = current_app.config.get("RULE_ENGINE") - return engine + return current_app.config.get("RULE_ENGINE") except Exception: - # If called outside a Flask request context, fail gracefully return None -def run_rule_checks(text, category): +def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]: """ - Run all rules for a given category against the provided text. - - Args: - text (str): The content to test (e.g., form snippet, inline JS). - category (str): The rule category to run (e.g., 'form' or 'script'). + Summarize a list of engine rule result dicts (result = "PASS"|"FAIL"). Returns: - dict: { - "checks": [ { "rule": str, "category": str, "matched": bool, "reason": Optional[str] }, ... ], - "summary": { "matched_count": int, "total_rules": int } + {'fail_count': int, 'total_rules': int} + """ + summary = {"fail_count": 0, "total_rules": 0} + index = 0 + total = len(results) + while index < total: + item = results[index] + summary["total_rules"] = summary["total_rules"] + 1 + if str(item.get("result", "")).upper() == "FAIL": + summary["fail_count"] = summary["fail_count"] + 1 + index = index + 1 + return summary + + +def run_rule_checks(text: str, category: str) -> Dict[str, Any]: + """ + Run all rules for a given category against provided text, returning a table-friendly model. + + Args: + text: Text to analyze (HTML, snippet, etc.) + category: One of 'form', 'script', 'text' (or any category your rules use) + + Returns: + { + "checks": [ + { "name": str, "description": str, "category": str, + "result": "PASS"|"FAIL", "reason": Optional[str], + "severity": Optional[str], "tags": Optional[List[str]] }, ... + ], + "summary": { "fail_count": int, "total_rules": int } } """ - result = { - "checks": [], - "summary": { - "matched_count": 0, - "total_rules": 0 - } - } - + out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}} engine = get_rule_engine() + if engine is None: - # No engine configured; return empty but well-formed structure - return result + return out try: - # Run engine rules for the specified category - check_results = engine.run_all(text, category=category) - - # Normalize results into the expected structure - total = 0 - matched = 0 - - for item in check_results: - # item is expected to contain: rule, category, matched, reason (optional) - total = total + 1 - if bool(item.get("matched")): - matched = matched + 1 - + engine_results = engine.run_all(text, category=category) # list of dicts + # Normalize explicitly + index = 0 + total = len(engine_results) + while index < total: + item = engine_results[index] normalized = { - "rule": item.get("rule"), + "name": item.get("name"), + "description": item.get("description"), "category": item.get("category"), - "matched": bool(item.get("matched")), - "reason": item.get("reason") + "result": item.get("result"), # "PASS" | "FAIL" + "reason": item.get("reason"), # present on FAIL by engine design + "severity": item.get("severity"), + "tags": item.get("tags"), } - result["checks"].append(normalized) + out["checks"].append(normalized) + index = index + 1 - result["summary"]["matched_count"] = matched - result["summary"]["total_rules"] = total - - except Exception as e: - # If anything goes wrong, keep structure and add a fake failure note - result["checks"].append({ - "rule": "engine_error", + out["summary"] = _summarize_results(out["checks"]) + except Exception as exc: + # Preserve shape; record the error as a synthetic PASS (so UI doesn't break) + out["checks"].append({ + "name": "engine_error", + "description": "Rule engine failed during evaluation", "category": category, - "matched": False, - "reason": f"Rule engine error: {e}" + "result": "PASS", + "reason": f"{exc}", + "severity": None, + "tags": None }) - result["summary"]["matched_count"] = 0 - result["summary"]["total_rules"] = 0 + out["summary"] = {"fail_count": 0, "total_rules": 1} - return result + return out -def analyze_forms(html: str, base_url: str): +def build_rule_checks_overview(full_html_text: str) -> List[Dict[str, Any]]: """ - Parse forms from the page HTML and apply heuristic flags and rule-based checks. - - Args: - html (str): The full page HTML. - base_url (str): The final URL of the page (used for hostname comparisons). + Build a top-level overview for the results page: runs each category across + the entire HTML and groups results by category. Returns: - list[dict]: A list of form analysis dictionaries, each including: - - action, method, inputs - - flagged (bool), flag_reasons (list[str]), status (str) - - rule_checks: dict with "checks" (list) and "summary" (dict) + [ + {"category": "script", "results": [ ...engine dicts... ], "summary": {...}}, + {"category": "form", "results": [ ... ], "summary": {...}}, + {"category": "text", "results": [ ... ], "summary": {...}}, + ] + """ + overview: List[Dict[str, Any]] = [] + engine = get_rule_engine() + + categories = ["script", "form", "text"] + index = 0 + total = len(categories) + + while index < total: + cat = categories[index] + block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}} + + if engine is not None: + try: + results = engine.run_all(full_html_text, category=cat) + block["results"] = results + block["summary"] = _summarize_results(results) + except Exception as exc: + block["results"] = [{ + "name": "engine_error", + "description": "Rule engine failed during overview evaluation", + "category": cat, + "result": "PASS", + "reason": f"{exc}", + "severity": None, + "tags": None + }] + block["summary"] = {"fail_count": 0, "total_rules": 1} + + overview.append(block) + index = index + 1 + + return overview + + +# --------------------------------------------------------------------------- +# Form & Script analysis (plumbing only; detection is in the rules engine) +# --------------------------------------------------------------------------- + +def analyze_forms(html: str, base_url: str) -> List[Dict[str, Any]]: + """ + Parse forms from the page HTML and apply rule-based checks (engine), keeping + only simple plumbing heuristics here (no security logic). + + Returns list of dicts with keys: + - action, method, inputs + - flagged (bool), flag_reasons (list[str]), status (str) + - rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation) + + Note: + The 'flagged' value is now purely a legacy visual hint based on simple + heuristics; the authoritative PASS/FAIL details are in rule_checks. + As you migrate heuristics into function rules, this 'flagged' may be + removed entirely. """ soup = BeautifulSoup(html, "lxml") - forms_info = [] + forms_info: List[Dict[str, Any]] = [] page_hostname = urlparse(base_url).hostname for form in soup.find_all("form"): @@ -118,40 +208,31 @@ def analyze_forms(html: str, base_url: str): method = form.get("method", "get").lower() # Build explicit inputs list - inputs = [] + inputs: List[Dict[str, Any]] = [] for inp in form.find_all("input"): input_name = inp.get("name") input_type = inp.get("type", "text") - inputs.append({ - "name": input_name, - "type": input_type - }) + inputs.append({"name": input_name, "type": input_type}) - flagged_reasons = [] + # Minimal legacy flags (kept for UI continuity; detection lives in engine) + flagged_reasons: List[str] = [] - # No action specified if not action or str(action).strip() == "": flagged_reasons.append("No action specified") - - # External host else: try: action_host = urlparse(action).hostname if not str(action).startswith("/") and action_host != page_hostname: flagged_reasons.append("Submits to a different host") except Exception: - # If hostname parsing fails, skip this condition quietly pass - # HTTP form on HTTPS page try: if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https": flagged_reasons.append("Submits over insecure HTTP") except Exception: - # If scheme parsing fails, ignore pass - # Hidden password / suspicious hidden inputs for hidden in form.find_all("input", type="hidden"): name_value = hidden.get("name") or "" if "password" in name_value.lower(): @@ -159,15 +240,23 @@ def analyze_forms(html: str, base_url: str): flagged = bool(flagged_reasons) - # Serialize a simple form snippet for the rules engine to analyze (category='form') + # Serialize a simple form snippet for rule category='form' snippet_lines = [] + snippet_lines.append(f"base_url={base_url}") + snippet_lines.append(f"base_hostname={page_hostname}") snippet_lines.append(f"action={action}") snippet_lines.append(f"method={method}") snippet_lines.append("inputs=") - for item in inputs: + + i = 0 + n = len(inputs) + while i < n: + item = inputs[i] snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}") + i = i + 1 form_snippet = "\n".join(snippet_lines) + # Per-form rule checks (PASS/FAIL list via engine) rule_checks = run_rule_checks(form_snippet, category="form") forms_info.append({ @@ -183,156 +272,116 @@ def analyze_forms(html: str, base_url: str): return forms_info -def analyze_scripts(html: str, base_url: str = "", engine=None) -> list[dict]: +def analyze_scripts(html: str, base_url: str = "") -> List[Dict[str, Any]]: """ - Analyze