- Add SSL/TLS intelligence pipeline:
- crt.sh lookup with expired-filtering and root-domain wildcard resolution
- live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data
- Introduce `URLNormalizer` with punycode support and typo repair
- Auto-prepend `https://` for bare domains (e.g., `google.com`)
- Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
- `get_url_normalizer()` reads defaults from Settings (if present)
- Standardize function-rule return shape to `(bool, dict|None)` across
`form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
- Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
- Adapt non-dict inputs to facts (category-aware and via provided adapter)
- Return `(True, dict)` on match, `(False, None)` on miss
- Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
- Back rules by private `self._rules`; `rules` property returns copy
- Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
- Fix AttributeError from property assignment during `__init__`
- Replace hidden singleton factory with explicit builder + global state:
- `app/rules/factory.py::build_rules_engine()` builds and logs totals
- `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
- `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts
- Unify logging wiring:
- `wire_logging_once(app)` clears and attaches a single handler chain
- Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
- Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)
- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)
- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section
- Consumers should import the rules engine via:
- `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
then publish with `set_rules_engine()`. Do not call old singleton factories.
- New/changed modules (high level):
- `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
- `app/rules/function_rules.py` (±) — normalized payload returns
- `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
- `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
- `app/rules/factory.py` (±) — pure builder; totals logged post-registration
- `app/state.py` (+) — process-global rules engine
- `app/logging_setup.py` (±) — single chain, two named loggers
- `app/wsgi.py` (±) — preload build + `set_rules_engine()`
- `entrypoint.sh` (±) — add `--preload`
- templates (±) — TLS card, raw toggle; front-page checkbox
Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
551 lines
21 KiB
Python
551 lines
21 KiB
Python
"""
|
||
app/browser.py
|
||
|
||
Singleton, lazily-loaded page fetcher + analysis orchestrator for SneakyScope.
|
||
|
||
Responsibilities:
|
||
- Fetch a URL (HTML, redirects, etc.)
|
||
- Run the Suspicious Rules Engine (PASS/FAIL for all rules)
|
||
- Write artifacts (screenshot.png, source.txt, results.json) into /data/<run_uuid>/
|
||
- Return a single 'result' dict suitable for UI and future API
|
||
|
||
Design notes:
|
||
- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules).
|
||
- This module keeps "plumbing" only (fetch, extract, persist).
|
||
- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types).
|
||
|
||
Assumptions:
|
||
- Flask app context is active (uses current_app for logger and RULE_ENGINE).
|
||
- SANDBOX_STORAGE is configured (default: /data).
|
||
- enrich_url(url) returns enrichment dict.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
from urllib.parse import urlparse
|
||
|
||
from bs4 import BeautifulSoup
|
||
from flask import current_app
|
||
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
|
||
|
||
from app.utils.io_helpers import safe_write
|
||
from app.utils.enrichment import enrich_url
|
||
from app.utils.settings import get_settings
|
||
from app.logging_setup import get_app_logger
|
||
|
||
# Load settings once for constants / defaults
|
||
settings = get_settings()
|
||
|
||
logger = get_app_logger()
|
||
|
||
class Browser:
|
||
"""
|
||
Orchestrates page fetching and analysis. Meant to be accessed via the
|
||
lazily-loaded singleton factory `get_browser()`.
|
||
"""
|
||
|
||
def __init__(self, storage_dir: Optional[Path] = None) -> None:
|
||
"""
|
||
Args:
|
||
storage_dir: Base directory for run artifacts. Defaults to settings.sandbox.storage
|
||
(typically /data) if not provided.
|
||
"""
|
||
if storage_dir is None:
|
||
try:
|
||
# Prefer your settings model’s configured storage path
|
||
storage_dir = Path(settings.sandbox.storage)
|
||
except Exception:
|
||
storage_dir = Path("/data")
|
||
|
||
self.storage_dir: Path = storage_dir
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Engine access helpers
|
||
# -----------------------------------------------------------------------
|
||
@staticmethod
|
||
def _get_rule_engine():
|
||
"""
|
||
Retrieve the rules engine instance from the Flask application config.
|
||
|
||
Returns:
|
||
RuleEngine or None: The engine if available, or None if not configured.
|
||
"""
|
||
try:
|
||
return current_app.config.get("RULE_ENGINE")
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]:
|
||
"""
|
||
Summarize a list of engine rule result dicts (result = "PASS"|"FAIL").
|
||
|
||
Returns:
|
||
{'fail_count': int, 'total_rules': int}
|
||
"""
|
||
summary = {"fail_count": 0, "total_rules": 0}
|
||
index = 0
|
||
total = len(results)
|
||
while index < total:
|
||
item = results[index]
|
||
summary["total_rules"] = summary["total_rules"] + 1
|
||
if str(item.get("result", "")).upper() == "FAIL":
|
||
summary["fail_count"] = summary["fail_count"] + 1
|
||
index = index + 1
|
||
return summary
|
||
|
||
def run_rule_checks(self, text: str, category: str) -> Dict[str, Any]:
|
||
"""
|
||
Run all rules for a given category against provided text, returning a table-friendly model.
|
||
|
||
Args:
|
||
text: Text to analyze (HTML, snippet, etc.)
|
||
category: One of 'form', 'script', 'text' (or any category your rules use)
|
||
|
||
Returns:
|
||
{
|
||
"checks": [
|
||
{ "name": str, "description": str, "category": str,
|
||
"result": "PASS"|"FAIL", "reason": Optional[str],
|
||
"severity": Optional[str], "tags": Optional[List[str]] }, ...
|
||
],
|
||
"summary": { "fail_count": int, "total_rules": int }
|
||
}
|
||
"""
|
||
out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}}
|
||
engine = self._get_rule_engine()
|
||
|
||
if engine is None:
|
||
return out
|
||
|
||
try:
|
||
engine_results = engine.run_all(text, category=category) # list of dicts
|
||
index = 0
|
||
total = len(engine_results)
|
||
while index < total:
|
||
item = engine_results[index]
|
||
normalized = {
|
||
"name": item.get("name"),
|
||
"description": item.get("description"),
|
||
"category": item.get("category"),
|
||
"result": item.get("result"), # "PASS" | "FAIL"
|
||
"reason": item.get("reason"), # present on FAIL by engine design
|
||
"severity": item.get("severity"),
|
||
"tags": item.get("tags"),
|
||
}
|
||
out["checks"].append(normalized)
|
||
index = index + 1
|
||
|
||
out["summary"] = self._summarize_results(out["checks"])
|
||
except Exception as exc:
|
||
# Preserve shape; record the error as a synthetic PASS (so UI doesn't break)
|
||
out["checks"].append({
|
||
"name": "engine_error",
|
||
"description": "Rule engine failed during evaluation",
|
||
"category": category,
|
||
"result": "PASS",
|
||
"reason": f"{exc}",
|
||
"severity": None,
|
||
"tags": None
|
||
})
|
||
out["summary"] = {"fail_count": 0, "total_rules": 1}
|
||
|
||
return out
|
||
|
||
def build_rule_checks_overview(self, full_html_text: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
Build a top-level overview for the results page: runs each category across
|
||
the entire HTML and groups results by category.
|
||
|
||
Returns:
|
||
[
|
||
{"category": "script", "results": [ ...engine dicts... ], "summary": {...}},
|
||
{"category": "form", "results": [ ... ], "summary": {...}},
|
||
{"category": "text", "results": [ ... ], "summary": {...}},
|
||
]
|
||
"""
|
||
overview: List[Dict[str, Any]] = []
|
||
engine = self._get_rule_engine()
|
||
|
||
categories = ["script", "form", "text"]
|
||
index = 0
|
||
total = len(categories)
|
||
|
||
while index < total:
|
||
cat = categories[index]
|
||
block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}}
|
||
|
||
if engine is not None:
|
||
try:
|
||
results = engine.run_all(full_html_text, category=cat)
|
||
block["results"] = results
|
||
block["summary"] = self._summarize_results(results)
|
||
except Exception as exc:
|
||
block["results"] = [{
|
||
"name": "engine_error",
|
||
"description": "Rule engine failed during overview evaluation",
|
||
"category": cat,
|
||
"result": "PASS",
|
||
"reason": f"{exc}",
|
||
"severity": None,
|
||
"tags": None
|
||
}]
|
||
block["summary"] = {"fail_count": 0, "total_rules": 1}
|
||
|
||
overview.append(block)
|
||
index = index + 1
|
||
|
||
return overview
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Form & Script analysis (plumbing only; detection is in the rules engine)
|
||
# -----------------------------------------------------------------------
|
||
def analyze_forms(self, html: str, base_url: str = "") -> List[Dict[str, Any]]:
|
||
"""
|
||
Collect form artifacts and evaluate per-form matches via the rules engine.
|
||
Only include rows that matched at least one rule.
|
||
|
||
Returns list of dicts with keys (per matched form):
|
||
- type: "form"
|
||
- action, method, inputs
|
||
- content_snippet: str
|
||
- rules: List[{name, description, severity?, tags?}]
|
||
"""
|
||
soup = BeautifulSoup(html, "lxml")
|
||
results: List[Dict[str, Any]] = []
|
||
|
||
engine = self._get_rule_engine()
|
||
base_hostname = urlparse(base_url).hostname or ""
|
||
# Match how scripts picks preview len
|
||
try:
|
||
preview_len = getattr(settings.ui, "snippet_preview_len", 200) # keep parity with scripts
|
||
except Exception:
|
||
preview_len = 200
|
||
|
||
for form in soup.find_all("form"):
|
||
try:
|
||
action = (form.get("action") or "").strip()
|
||
method = (form.get("method") or "get").strip().lower()
|
||
|
||
inputs: List[Dict[str, Any]] = []
|
||
for inp in form.find_all("input"):
|
||
inputs.append({
|
||
"name": inp.get("name"),
|
||
"type": (inp.get("type") or "text").strip().lower(),
|
||
})
|
||
|
||
# Use the actual form markup for regex rules
|
||
form_markup = str(form)
|
||
# UI-friendly snippet
|
||
content_snippet = form_markup[:preview_len]
|
||
|
||
matches: List[Dict[str, Any]] = []
|
||
if engine is not None:
|
||
for r in getattr(engine, "rules", []):
|
||
if getattr(r, "category", None) != "form":
|
||
continue
|
||
rtype = getattr(r, "rule_type", None)
|
||
|
||
try:
|
||
ok = False
|
||
reason = ""
|
||
if rtype == "regex":
|
||
# Run against the raw form HTML
|
||
ok, reason = r.run(form_markup)
|
||
elif rtype == "function":
|
||
# Structured facts for function-style rules
|
||
facts = {
|
||
"category": "form",
|
||
"base_url": base_url,
|
||
"base_hostname": base_hostname,
|
||
"action": action,
|
||
"action_hostname": urlparse(action).hostname or "",
|
||
"method": method,
|
||
"inputs": inputs,
|
||
"markup": form_markup,
|
||
}
|
||
ok, reason = r.run(facts)
|
||
else:
|
||
continue
|
||
|
||
if ok:
|
||
matches.append({
|
||
"name": getattr(r, "name", "unknown_rule"),
|
||
"description": (reason or "") or getattr(r, "description", ""),
|
||
"severity": getattr(r, "severity", None),
|
||
"tags": getattr(r, "tags", None),
|
||
})
|
||
except Exception as rule_exc:
|
||
# Be defensive—bad rule shouldn't break the form pass
|
||
try:
|
||
logger.debug("Form rule error", extra={"rule": getattr(r, "name", "?"), "error": str(rule_exc)})
|
||
except Exception:
|
||
pass
|
||
continue
|
||
|
||
if matches:
|
||
results.append({
|
||
"type": "form",
|
||
"action": action,
|
||
"method": method,
|
||
"inputs": inputs,
|
||
"content_snippet": content_snippet,
|
||
"rules": matches,
|
||
})
|
||
|
||
except Exception as exc:
|
||
# Keep analysis resilient
|
||
try:
|
||
logger.error("Form analysis error", extra={"error": str(exc)})
|
||
except Exception:
|
||
pass
|
||
results.append({
|
||
"type": "form",
|
||
"heuristics": [f"Form analysis error: {exc}"],
|
||
})
|
||
|
||
return results
|
||
|
||
def analyze_scripts(self, html: str, base_url: str = "") -> List[Dict[str, Any]]:
|
||
"""
|
||
Collect script artifacts and evaluate per-script matches via the rules engine.
|
||
Only include rows that matched at least one rule.
|
||
"""
|
||
soup = BeautifulSoup(html, "lxml")
|
||
results: List[Dict[str, Any]] = []
|
||
|
||
benign_types = {"application/ld+json", "application/json"}
|
||
|
||
engine = self._get_rule_engine()
|
||
base_hostname = urlparse(base_url).hostname or ""
|
||
|
||
for script in soup.find_all("script"):
|
||
try:
|
||
src = (script.get("src") or "").strip()
|
||
s_type_attr = (script.get("type") or "").strip().lower()
|
||
inline_text = script.get_text(strip=True) or ""
|
||
|
||
if s_type_attr in benign_types:
|
||
continue
|
||
|
||
record: Dict[str, Any] = {}
|
||
if src:
|
||
record["type"] = "external"
|
||
record["src"] = src
|
||
elif inline_text:
|
||
# respect your UI snippet config
|
||
preview_len = getattr(settings.ui, "snippet_preview_len", 200)
|
||
record["type"] = "inline"
|
||
record["content_snippet"] = (inline_text[:preview_len]).replace("\n", " ")
|
||
else:
|
||
record["type"] = "unknown"
|
||
|
||
matches: List[Dict[str, Any]] = []
|
||
if engine is not None:
|
||
if inline_text:
|
||
for r in engine.rules:
|
||
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex":
|
||
ok, reason = r.run(inline_text)
|
||
if ok:
|
||
matches.append({
|
||
"name": getattr(r, "name", "unknown_rule"),
|
||
"description": getattr(r, "description", "") or (reason or ""),
|
||
"severity": getattr(r, "severity", None),
|
||
"tags": getattr(r, "tags", None),
|
||
})
|
||
|
||
if src:
|
||
facts = {
|
||
"src": src,
|
||
"base_url": base_url,
|
||
"base_hostname": base_hostname,
|
||
"src_hostname": urlparse(src).hostname or "",
|
||
"category": "script",
|
||
}
|
||
for r in engine.rules:
|
||
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function":
|
||
ok, reason = r.run(facts)
|
||
if ok:
|
||
matches.append({
|
||
"name": getattr(r, "name", "unknown_rule"),
|
||
"description": (reason or "") or getattr(r, "description", ""),
|
||
"severity": getattr(r, "severity", None),
|
||
"tags": getattr(r, "tags", None),
|
||
})
|
||
|
||
if matches:
|
||
record["rules"] = matches
|
||
results.append(record)
|
||
|
||
except Exception as exc:
|
||
results.append({
|
||
"type": "unknown",
|
||
"heuristics": [f"Script analysis error: {exc}"]
|
||
})
|
||
|
||
return results
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Fetcher / Orchestrator
|
||
# -----------------------------------------------------------------------
|
||
async def fetch_page_artifacts(self, url: str, fetch_ssl_enabled:bool=False) -> Dict[str, Any]:
|
||
"""
|
||
Fetch page artifacts and save them in a UUID-based directory for this Browser's storage_dir.
|
||
|
||
Writes:
|
||
- /data/<uuid>/screenshot.png
|
||
- /data/<uuid>/source.html
|
||
- /data/<uuid>/results.json (single source of truth for routes)
|
||
|
||
Returns:
|
||
result dict with keys used by templates (and future API).
|
||
"""
|
||
run_uuid = str(uuid.uuid4())
|
||
run_dir = self.storage_dir / run_uuid
|
||
run_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
screenshot_path = run_dir / "screenshot.png"
|
||
source_path = run_dir / "source.html"
|
||
results_path = run_dir / "results.json"
|
||
|
||
redirects: List[Dict[str, Any]] = []
|
||
downloads: List[Dict[str, Any]] = []
|
||
scripts_seen: List[str] = []
|
||
|
||
async with async_playwright() as pw:
|
||
browser = await pw.chromium.launch(
|
||
headless=True,
|
||
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"]
|
||
)
|
||
context = await browser.new_context(
|
||
viewport={"width": 1920, "height": 1080},
|
||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
|
||
java_script_enabled=True,
|
||
locale="en-US"
|
||
)
|
||
page = await context.new_page()
|
||
|
||
# Event handlers (plumbing)
|
||
def _on_response(resp):
|
||
try:
|
||
if 300 <= resp.status <= 399:
|
||
redirects.append({"status": resp.status, "url": resp.url})
|
||
except Exception:
|
||
pass
|
||
|
||
def _on_download(d):
|
||
try:
|
||
downloads.append({"url": d.url, "suggested_filename": d.suggested_filename})
|
||
except Exception:
|
||
pass
|
||
|
||
def _on_request(r):
|
||
try:
|
||
if r.url.endswith((".js", ".vbs", ".hta")):
|
||
scripts_seen.append(r.url)
|
||
except Exception:
|
||
pass
|
||
|
||
page.on("response", _on_response)
|
||
page.on("download", _on_download)
|
||
page.on("request", _on_request)
|
||
|
||
try:
|
||
await page.goto(url, wait_until="networkidle", timeout=60000)
|
||
final_url = page.url
|
||
await page.screenshot(path=str(screenshot_path), full_page=True)
|
||
html = await page.content()
|
||
safe_write(source_path, html)
|
||
except PWTimeoutError:
|
||
final_url = page.url
|
||
safe_write(source_path, "Page did not fully load (timeout)")
|
||
await page.screenshot(path=str(screenshot_path), full_page=True)
|
||
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
# Read back saved source
|
||
html_content = source_path.read_text(encoding="utf-8")
|
||
|
||
# Forms analysis (per-form rule checks)
|
||
forms_info = self.analyze_forms(html_content, final_url)
|
||
|
||
# Scripts artifacts (no detection here)
|
||
suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url)
|
||
|
||
# Enrichment
|
||
enrichment = enrich_url(url, fetch_ssl_enabled)
|
||
|
||
# Global PASS/FAIL table per category (entire document)
|
||
rule_checks_overview = self.build_rule_checks_overview(html_content)
|
||
|
||
try:
|
||
for blk in rule_checks_overview:
|
||
current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}")
|
||
except Exception:
|
||
pass
|
||
|
||
# Assemble single result dict
|
||
result: Dict[str, Any] = {
|
||
"uuid": run_uuid,
|
||
"submitted_url": url,
|
||
"final_url": final_url,
|
||
"redirects": redirects,
|
||
"downloads": downloads,
|
||
"scripts": scripts_seen,
|
||
"forms": forms_info,
|
||
"suspicious_scripts": suspicious_scripts,
|
||
"rule_checks": rule_checks_overview, # table-ready for UI
|
||
"enrichment": enrichment
|
||
}
|
||
|
||
# Persist as the single source of truth for routes
|
||
safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
try:
|
||
logger.info(f"Saved results.json for run {run_uuid}")
|
||
except Exception:
|
||
pass
|
||
|
||
return result
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Lazy-loaded singleton factory
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Prefer importing your project-wide singleton decorator.
|
||
try:
|
||
from app.utils.settings import singleton_loader # if we already export it
|
||
except Exception:
|
||
# Local fallback if import is not available.
|
||
from typing import Callable, TypeVar
|
||
import functools
|
||
T = TypeVar("T")
|
||
def singleton_loader(func: Callable[..., T]) -> Callable[..., T]:
|
||
"""Ensure the function only runs once, returning the cached value."""
|
||
cache: dict[str, T] = {}
|
||
@functools.wraps(func)
|
||
def wrapper(*args, **kwargs) -> T:
|
||
if func.__name__ not in cache:
|
||
cache[func.__name__] = func(*args, **kwargs)
|
||
return cache[func.__name__]
|
||
return wrapper
|
||
|
||
|
||
@singleton_loader
|
||
def get_browser(storage_dir: Optional[Path] = None) -> Browser:
|
||
"""
|
||
Lazily construct and cache a singleton Browser instance.
|
||
|
||
Args:
|
||
storage_dir: Optional override for artifact base directory.
|
||
|
||
Returns:
|
||
Browser: The singleton instance.
|
||
"""
|
||
return Browser(storage_dir=storage_dir)
|