feat(engine,ui): unify detection in rules engine, add function rules & per-script matches; improve scripts table UX

Core changes
- Centralize detection in the Rules Engine; browser.py now focuses on fetch/extract/persist.
- Add class-based adapters:
  - FactAdapter: converts snippets → structured facts.
  - FunctionRuleAdapter: wraps dict-based rule functions for engine input (str or dict).
- Register function rules (code-based) alongside YAML rules:
  - form_action_missing
  - form_http_on_https_page
  - form_submits_to_different_host
  - script_src_uses_data_or_blob
  - script_src_has_dangerous_extension
  - script_third_party_host

Rules & YAML
- Expand/normalize YAML rules with severities + tags; tighten patterns.
- Add new regex rules: new_function_usage, unescape_usage, string_timer_usage, long_hex_constants.
- Move iframe rule to `text` category.
- Keep existing script/form/text rules; all compile under IGNORECASE.

Browser / analysis refactor
- browser.py:
  - Remove inline heuristics; rely on engine for PASS/FAIL, reason, severity, tags.
  - Build page-level overview (`rule_checks`) across categories.
  - Analyze forms: add `base_url` + `base_hostname` to snippet so function rules can evaluate; include per-form rule_checks.
  - Analyze scripts: **per-script evaluation**:
    - Inline -> run regex script rules on inline text.
    - External -> run function script rules with a facts dict (src/src_hostname/base_url/base_hostname).
    - Only include scripts that matched ≥1 rule; attach severity/tags to matches.
  - Persist single source of truth: `/data/<uuid>/results.json`.
  - Backward-compat: `fetch_page_artifacts(..., engine=...)` kwarg accepted/ignored.

UI/UX
- Suspicious Scripts table now shows only matched scripts.
- Add severity badges and tag chips; tooltips show rule description.
- Prevent table blowouts:
  - Fixed layout + ellipsis + wrapping helpers (`.scripts-table`, `.breakable`, `details pre.code`).
  - Shortened inline snippet preview (configurable).
- Minor template niceties (e.g., rel="noopener" on external links where applicable).

Config
- Add `ui.snippet_preview_len` to settings.yaml; default 160.
- Load into `app.config["SNIPPET_PREVIEW_LEN"]` and use in `analyze_scripts`.

Init / wiring
- Import and register function rules as `Rule(...)` objects (not dicts).
- Hook Rules Engine to Flask logger for verbose/diagnostic output.
- Log totals on startup; keep YAML path override via `SNEAKYSCOPE_RULES_FILE`.

Bug fixes
- Fix boot crash: pass `Rule` instances to `engine.add_rule()` instead of dicts.
- Fix “N/A” in scripts table by actually computing per-script matches.
- Ensure form rules fire by including `base_url`/`base_hostname` in form snippets.

Roadmap
- Update roadmap to reflect completed items:
  - “Show each check and whether it triggered (pass/fail list per rule)”
  - Severity levels + tags in Suspicious Scripts
  - Results.json as route source of truth
  - Scripts table UX (badges, tooltips, layout fix)
This commit is contained in:
2025-08-20 21:33:30 -05:00
parent 70d29f9f95
commit 1eb2a52f17
14 changed files with 1108 additions and 423 deletions

View File

@@ -1,18 +1,46 @@
import re
import uuid
import json
from pathlib import Path
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urlparse
from typing import Dict, Any, Optional
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
"""
app/browser.py
from flask import current_app # access the rule engine from app config
Page fetcher + analysis orchestrator for SneakyScope.
- Fetches a URL (HTML, redirects, etc.)
- Runs the Suspicious Rules Engine (PASS/FAIL for all rules)
- Writes artifacts (screenshot.png, source.txt, results.json) into /data/<run_uuid>/
- Returns a single 'result' dict suitable for UI and future API
Design notes:
- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules).
- This module keeps "plumbing" only (fetch, extract, persist).
- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types).
Assumptions:
- Flask app context is active (uses current_app for logger and RULE_ENGINE).
- SANDBOX_STORAGE is configured (default: /data).
- enrich_url(url) returns enrichment dict.
"""
import json
import uuid
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from flask import current_app
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
from app.utils.io_helpers import safe_write
from .enrichment import enrich_url
from .utils.settings import get_settings
settings = get_settings()
# ---------------------------------------------------------------------------
# Engine access helpers
# ---------------------------------------------------------------------------
def get_rule_engine():
"""
Retrieve the rules engine instance from the Flask application config.
@@ -21,96 +49,158 @@ def get_rule_engine():
RuleEngine or None: The engine if available, or None if not configured.
"""
try:
# current_app is only available during an active request context
engine = current_app.config.get("RULE_ENGINE")
return engine
return current_app.config.get("RULE_ENGINE")
except Exception:
# If called outside a Flask request context, fail gracefully
return None
def run_rule_checks(text, category):
def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]:
"""
Run all rules for a given category against the provided text.
Args:
text (str): The content to test (e.g., form snippet, inline JS).
category (str): The rule category to run (e.g., 'form' or 'script').
Summarize a list of engine rule result dicts (result = "PASS"|"FAIL").
Returns:
dict: {
"checks": [ { "rule": str, "category": str, "matched": bool, "reason": Optional[str] }, ... ],
"summary": { "matched_count": int, "total_rules": int }
{'fail_count': int, 'total_rules': int}
"""
summary = {"fail_count": 0, "total_rules": 0}
index = 0
total = len(results)
while index < total:
item = results[index]
summary["total_rules"] = summary["total_rules"] + 1
if str(item.get("result", "")).upper() == "FAIL":
summary["fail_count"] = summary["fail_count"] + 1
index = index + 1
return summary
def run_rule_checks(text: str, category: str) -> Dict[str, Any]:
"""
Run all rules for a given category against provided text, returning a table-friendly model.
Args:
text: Text to analyze (HTML, snippet, etc.)
category: One of 'form', 'script', 'text' (or any category your rules use)
Returns:
{
"checks": [
{ "name": str, "description": str, "category": str,
"result": "PASS"|"FAIL", "reason": Optional[str],
"severity": Optional[str], "tags": Optional[List[str]] }, ...
],
"summary": { "fail_count": int, "total_rules": int }
}
"""
result = {
"checks": [],
"summary": {
"matched_count": 0,
"total_rules": 0
}
}
out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}}
engine = get_rule_engine()
if engine is None:
# No engine configured; return empty but well-formed structure
return result
return out
try:
# Run engine rules for the specified category
check_results = engine.run_all(text, category=category)
# Normalize results into the expected structure
total = 0
matched = 0
for item in check_results:
# item is expected to contain: rule, category, matched, reason (optional)
total = total + 1
if bool(item.get("matched")):
matched = matched + 1
engine_results = engine.run_all(text, category=category) # list of dicts
# Normalize explicitly
index = 0
total = len(engine_results)
while index < total:
item = engine_results[index]
normalized = {
"rule": item.get("rule"),
"name": item.get("name"),
"description": item.get("description"),
"category": item.get("category"),
"matched": bool(item.get("matched")),
"reason": item.get("reason")
"result": item.get("result"), # "PASS" | "FAIL"
"reason": item.get("reason"), # present on FAIL by engine design
"severity": item.get("severity"),
"tags": item.get("tags"),
}
result["checks"].append(normalized)
out["checks"].append(normalized)
index = index + 1
result["summary"]["matched_count"] = matched
result["summary"]["total_rules"] = total
except Exception as e:
# If anything goes wrong, keep structure and add a fake failure note
result["checks"].append({
"rule": "engine_error",
out["summary"] = _summarize_results(out["checks"])
except Exception as exc:
# Preserve shape; record the error as a synthetic PASS (so UI doesn't break)
out["checks"].append({
"name": "engine_error",
"description": "Rule engine failed during evaluation",
"category": category,
"matched": False,
"reason": f"Rule engine error: {e}"
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
})
result["summary"]["matched_count"] = 0
result["summary"]["total_rules"] = 0
out["summary"] = {"fail_count": 0, "total_rules": 1}
return result
return out
def analyze_forms(html: str, base_url: str):
def build_rule_checks_overview(full_html_text: str) -> List[Dict[str, Any]]:
"""
Parse forms from the page HTML and apply heuristic flags and rule-based checks.
Args:
html (str): The full page HTML.
base_url (str): The final URL of the page (used for hostname comparisons).
Build a top-level overview for the results page: runs each category across
the entire HTML and groups results by category.
Returns:
list[dict]: A list of form analysis dictionaries, each including:
- action, method, inputs
- flagged (bool), flag_reasons (list[str]), status (str)
- rule_checks: dict with "checks" (list) and "summary" (dict)
[
{"category": "script", "results": [ ...engine dicts... ], "summary": {...}},
{"category": "form", "results": [ ... ], "summary": {...}},
{"category": "text", "results": [ ... ], "summary": {...}},
]
"""
overview: List[Dict[str, Any]] = []
engine = get_rule_engine()
categories = ["script", "form", "text"]
index = 0
total = len(categories)
while index < total:
cat = categories[index]
block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}}
if engine is not None:
try:
results = engine.run_all(full_html_text, category=cat)
block["results"] = results
block["summary"] = _summarize_results(results)
except Exception as exc:
block["results"] = [{
"name": "engine_error",
"description": "Rule engine failed during overview evaluation",
"category": cat,
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
}]
block["summary"] = {"fail_count": 0, "total_rules": 1}
overview.append(block)
index = index + 1
return overview
# ---------------------------------------------------------------------------
# Form & Script analysis (plumbing only; detection is in the rules engine)
# ---------------------------------------------------------------------------
def analyze_forms(html: str, base_url: str) -> List[Dict[str, Any]]:
"""
Parse forms from the page HTML and apply rule-based checks (engine), keeping
only simple plumbing heuristics here (no security logic).
Returns list of dicts with keys:
- action, method, inputs
- flagged (bool), flag_reasons (list[str]), status (str)
- rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation)
Note:
The 'flagged' value is now purely a legacy visual hint based on simple
heuristics; the authoritative PASS/FAIL details are in rule_checks.
As you migrate heuristics into function rules, this 'flagged' may be
removed entirely.
"""
soup = BeautifulSoup(html, "lxml")
forms_info = []
forms_info: List[Dict[str, Any]] = []
page_hostname = urlparse(base_url).hostname
for form in soup.find_all("form"):
@@ -118,40 +208,31 @@ def analyze_forms(html: str, base_url: str):
method = form.get("method", "get").lower()
# Build explicit inputs list
inputs = []
inputs: List[Dict[str, Any]] = []
for inp in form.find_all("input"):
input_name = inp.get("name")
input_type = inp.get("type", "text")
inputs.append({
"name": input_name,
"type": input_type
})
inputs.append({"name": input_name, "type": input_type})
flagged_reasons = []
# Minimal legacy flags (kept for UI continuity; detection lives in engine)
flagged_reasons: List[str] = []
# No action specified
if not action or str(action).strip() == "":
flagged_reasons.append("No action specified")
# External host
else:
try:
action_host = urlparse(action).hostname
if not str(action).startswith("/") and action_host != page_hostname:
flagged_reasons.append("Submits to a different host")
except Exception:
# If hostname parsing fails, skip this condition quietly
pass
# HTTP form on HTTPS page
try:
if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https":
flagged_reasons.append("Submits over insecure HTTP")
except Exception:
# If scheme parsing fails, ignore
pass
# Hidden password / suspicious hidden inputs
for hidden in form.find_all("input", type="hidden"):
name_value = hidden.get("name") or ""
if "password" in name_value.lower():
@@ -159,15 +240,23 @@ def analyze_forms(html: str, base_url: str):
flagged = bool(flagged_reasons)
# Serialize a simple form snippet for the rules engine to analyze (category='form')
# Serialize a simple form snippet for rule category='form'
snippet_lines = []
snippet_lines.append(f"base_url={base_url}")
snippet_lines.append(f"base_hostname={page_hostname}")
snippet_lines.append(f"action={action}")
snippet_lines.append(f"method={method}")
snippet_lines.append("inputs=")
for item in inputs:
i = 0
n = len(inputs)
while i < n:
item = inputs[i]
snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}")
i = i + 1
form_snippet = "\n".join(snippet_lines)
# Per-form rule checks (PASS/FAIL list via engine)
rule_checks = run_rule_checks(form_snippet, category="form")
forms_info.append({
@@ -183,156 +272,116 @@ def analyze_forms(html: str, base_url: str):
return forms_info
def analyze_scripts(html: str, base_url: str = "", engine=None) -> list[dict]:
def analyze_scripts(html: str, base_url: str = "") -> List[Dict[str, Any]]:
"""
Analyze <script> elements using the RuleEngine (if provided) and
lightweight built-in heuristics. Only append a record when at least
one rule or heuristic matches, and always set a sensible 'type'.
Collect script artifacts and evaluate per-script matches via the rules engine.
Only include rows that matched at least one rule. Inline scripts are checked
against regex rules using their text; external scripts are checked against
function rules using a small 'facts' dict (src/hosts).
Returns list of dicts like:
{
"type": "external" | "inline" | "unknown",
"src": "...", # for external
"content_snippet": "...", # for inline
"rules": [ { "name": "...", "description": "..." }, ... ],
"heuristics": [ "reason1", "reason2", ... ]
"rules": [ { "name": "...", "description": "..." }, ... ]
}
"""
soup = BeautifulSoup(html, "lxml")
results: list[dict] = []
results: List[Dict[str, Any]] = []
import re
from urllib.parse import urlparse
# Benign MIME types we ignore entirely
# Benign MIME types we ignore entirely (non-detection plumbing)
benign_types = {"application/ld+json", "application/json"}
# Suspicious file extensions for external scripts
dangerous_ext = (".vbs", ".hta")
# Inline red flags
risky_inline_patterns = [
(re.compile(r"\beval\s*\(", re.IGNORECASE), "Uses eval()"),
(re.compile(r"\bnew\s+Function\s*\(", re.IGNORECASE), "Uses Function constructor"),
(re.compile(r"\bdocument\.write\s*\(", re.IGNORECASE), "Uses document.write()"),
(re.compile(r"\bActiveXObject\s*\(", re.IGNORECASE), "Uses ActiveXObject (IE-only)"),
(re.compile(r"\batob\s*\(", re.IGNORECASE), "Uses atob() (possible obfuscation)"),
(re.compile(r"\bunescape\s*\(", re.IGNORECASE), "Uses unescape() (legacy/obfuscation)"),
(re.compile(r"\bset(?:Timeout|Interval)\s*\(\s*['\"`].+['\"`]\s*,", re.IGNORECASE),
"String passed to setTimeout/setInterval"),
(re.compile(r"[\"']?0x[0-9a-fA-F]{16,}[\"']?", re.IGNORECASE),
"Contains long hex-like constants (possible obfuscation)"),
]
base_host = urlparse(base_url).hostname or ""
engine = get_rule_engine()
base_hostname = urlparse(base_url).hostname or ""
for script in soup.find_all("script"):
try:
src = (script.get("src") or "").strip()
s_type_attr = (script.get("type") or "").strip().lower()
# IMPORTANT: .string is often None; get_text() is reliable
inline_text = script.get_text(strip=True) or ""
# Skip benign structured data outright
# Skip benign structured data outright (noise control)
if s_type_attr in benign_types:
continue
# ---- Build facts for the rules engine
facts = {
"script_type_attr": s_type_attr or None,
"has_src": bool(src),
"src": src or None,
"attrs": dict(script.attrs),
"inline_len": len(inline_text),
"inline_preview": inline_text[:200].replace("\n", " ") if inline_text else None,
"base_url": base_url or None,
"base_hostname": base_host or None,
"src_hostname": urlparse(src).hostname if src else None,
}
# ---- Evaluate rules engine (using name/description)
engine_matches: list[dict] = []
if engine is not None:
try:
if hasattr(engine, "evaluate_script"):
matches = engine.evaluate_script(facts)
elif hasattr(engine, "evaluate"):
matches = engine.evaluate(facts)
else:
matches = []
if isinstance(matches, list):
for m in matches:
if isinstance(m, dict) and "name" in m:
engine_matches.append({
"name": m["name"],
"description": m.get("description", "")
})
elif isinstance(m, str):
engine_matches.append({"name": m, "description": ""})
except Exception as e:
engine_matches.append({"name": "Rules Engine Error", "description": str(e)})
# ---- Built-in heuristics
heuristics: list[str] = []
record: Dict[str, Any] = {}
if src:
# Unusual URL schemes for script sources
if src.startswith(("data:", "blob:")):
heuristics.append("Script src uses data:/blob: URL")
# Dangerous extensions
for ext in dangerous_ext:
if src.lower().endswith(ext):
heuristics.append(f"External script with dangerous extension ({ext.lstrip('.')})")
break
# Third-party host hint
src_host = facts.get("src_hostname") or ""
if base_host and src_host and src_host != base_host:
heuristics.append(f"Third-party host: {src_host}")
record["type"] = "external"
record["src"] = src
elif inline_text:
record["type"] = "inline"
record["content_snippet"] = (inline_text[:settings.ui.snippet_preview_len]).replace("\n", " ")
else:
record["type"] = "unknown"
# --- Per-script evaluation: gather matches from engine rules
matches: List[Dict[str, str]] = []
if engine is not None:
# Inline content → run regex script rules against the text
if inline_text:
for pat, why in risky_inline_patterns:
if pat.search(inline_text):
heuristics.append(why)
# ---- Only append when something matched; always set type
if engine_matches or heuristics:
record: dict = {}
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex":
ok, reason = r.run(inline_text)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": getattr(r, "description", "") or (reason or ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
# External src → run function script rules with facts
if src:
record["type"] = "external"
record["src"] = src
elif inline_text:
record["type"] = "inline"
record["content_snippet"] = facts.get("inline_preview")
else:
record["type"] = "unknown"
if engine_matches:
record["rules"] = engine_matches
if heuristics:
record["heuristics"] = heuristics
facts = {
"src": src,
"base_url": base_url,
"base_hostname": base_hostname,
"src_hostname": urlparse(src).hostname or "",
"category": "script",
}
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function":
ok, reason = r.run(facts)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": (reason or "") or getattr(r, "description", ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
# Only keep rows that matched at least one rule
if matches:
record["rules"] = matches
results.append(record)
except Exception as e:
# Never let a single broken <script> kill the whole analysis
except Exception as exc:
results.append({
"type": "unknown",
"heuristics": [f"Script analysis error: {e}"]
"heuristics": [f"Script analysis error: {exc}"]
})
return results
async def fetch_page_artifacts(url: str, storage_dir: Path, engine=None) -> Dict[str, Any]:
# ---------------------------------------------------------------------------
# Fetcher / Orchestrator
# ---------------------------------------------------------------------------
async def fetch_page_artifacts(url: str, storage_dir: Path) -> Dict[str, Any]:
"""
Fetch page artifacts and save them in a UUID-based directory.
Args:
url (str): URL to analyze.
storage_dir (Path): Base /data path.
engine: Optional rules engine instance (from app.config["RULE_ENGINE"]).
Writes:
- /data/<uuid>/screenshot.png
- /data/<uuid>/source.txt
- /data/<uuid>/results.json (single source of truth for routes)
Returns:
result dict with keys used by templates (and future API).
"""
run_uuid = str(uuid.uuid4())
run_dir = storage_dir / run_uuid
@@ -342,9 +391,9 @@ async def fetch_page_artifacts(url: str, storage_dir: Path, engine=None) -> Dict
source_path = run_dir / "source.txt"
results_path = run_dir / "results.json"
redirects = []
downloads = []
scripts = []
redirects: List[Dict[str, Any]] = []
downloads: List[Dict[str, Any]] = []
scripts_seen: List[str] = []
async with async_playwright() as pw:
browser = await pw.chromium.launch(
@@ -359,10 +408,30 @@ async def fetch_page_artifacts(url: str, storage_dir: Path, engine=None) -> Dict
)
page = await context.new_page()
# Event handlers
page.on("response", lambda resp: redirects.append({"status": resp.status, "url": resp.url}) if 300 <= resp.status <= 399 else None)
page.on("download", lambda d: downloads.append({"url": d.url, "suggested_filename": d.suggested_filename}))
page.on("request", lambda r: scripts.append(r.url) if r.url.endswith((".js", ".vbs", ".hta")) else None)
# Event handlers (plumbing)
def _on_response(resp):
try:
if 300 <= resp.status <= 399:
redirects.append({"status": resp.status, "url": resp.url})
except Exception:
pass
def _on_download(d):
try:
downloads.append({"url": d.url, "suggested_filename": d.suggested_filename})
except Exception:
pass
def _on_request(r):
try:
if r.url.endswith((".js", ".vbs", ".hta")):
scripts_seen.append(r.url)
except Exception:
pass
page.on("response", _on_response)
page.on("download", _on_download)
page.on("request", _on_request)
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
@@ -378,23 +447,65 @@ async def fetch_page_artifacts(url: str, storage_dir: Path, engine=None) -> Dict
await context.close()
await browser.close()
# Read back saved source
html_content = source_path.read_text(encoding="utf-8")
forms_info = analyze_forms(html_content, final_url)
suspicious_scripts = analyze_scripts(html_content, base_url=final_url, engine=engine)
# Forms analysis (per-form rule checks)
forms_info = analyze_forms(html_content, final_url)
# Scripts artifacts (no detection here)
suspicious_scripts = analyze_scripts(html_content, base_url=final_url)
# Enrichment
enrichment = enrich_url(url)
result = {
# Global PASS/FAIL table per category (entire document)
rule_checks_overview = build_rule_checks_overview(html_content)
for blk in rule_checks_overview:
current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}")
# Assemble single result dict
result: Dict[str, Any] = {
"uuid": run_uuid,
"submitted_url": url,
"final_url": final_url,
"redirects": redirects,
"downloads": downloads,
"scripts": scripts,
"scripts": scripts_seen,
"forms": forms_info,
"suspicious_scripts": suspicious_scripts,
"rule_checks": rule_checks_overview, # table-ready for UI
"enrichment": enrichment
}
safe_write(results_path, json.dumps(result, indent=2))
# Persist as the single source of truth for routes
safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False))
try:
current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}")
except Exception:
pass
return result
def load_results(storage_dir: Path, run_uuid: str) -> Optional[Dict[str, Any]]:
"""
Load a prior run's results.json from /data/<uuid>/.
Returns:
dict or None
"""
run_dir = storage_dir / run_uuid
results_path = run_dir / "results.json"
if not results_path.exists():
return None
try:
text = results_path.read_text(encoding="utf-8")
data = json.loads(text)
return data
except Exception:
return None