refactor of browser.py into object model

This commit is contained in:
2025-08-21 10:29:45 -05:00
parent b69c2be85c
commit 05cf23ad67
5 changed files with 527 additions and 529 deletions

View File

@@ -1,511 +0,0 @@
"""
app/browser.py
Page fetcher + analysis orchestrator for SneakyScope.
- Fetches a URL (HTML, redirects, etc.)
- Runs the Suspicious Rules Engine (PASS/FAIL for all rules)
- Writes artifacts (screenshot.png, source.txt, results.json) into /data/<run_uuid>/
- Returns a single 'result' dict suitable for UI and future API
Design notes:
- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules).
- This module keeps "plumbing" only (fetch, extract, persist).
- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types).
Assumptions:
- Flask app context is active (uses current_app for logger and RULE_ENGINE).
- SANDBOX_STORAGE is configured (default: /data).
- enrich_url(url) returns enrichment dict.
"""
import json
import uuid
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from flask import current_app
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
from app.utils.io_helpers import safe_write
from .enrichment import enrich_url
from .utils.settings import get_settings
settings = get_settings()
# ---------------------------------------------------------------------------
# Engine access helpers
# ---------------------------------------------------------------------------
def get_rule_engine():
"""
Retrieve the rules engine instance from the Flask application config.
Returns:
RuleEngine or None: The engine if available, or None if not configured.
"""
try:
return current_app.config.get("RULE_ENGINE")
except Exception:
return None
def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]:
"""
Summarize a list of engine rule result dicts (result = "PASS"|"FAIL").
Returns:
{'fail_count': int, 'total_rules': int}
"""
summary = {"fail_count": 0, "total_rules": 0}
index = 0
total = len(results)
while index < total:
item = results[index]
summary["total_rules"] = summary["total_rules"] + 1
if str(item.get("result", "")).upper() == "FAIL":
summary["fail_count"] = summary["fail_count"] + 1
index = index + 1
return summary
def run_rule_checks(text: str, category: str) -> Dict[str, Any]:
"""
Run all rules for a given category against provided text, returning a table-friendly model.
Args:
text: Text to analyze (HTML, snippet, etc.)
category: One of 'form', 'script', 'text' (or any category your rules use)
Returns:
{
"checks": [
{ "name": str, "description": str, "category": str,
"result": "PASS"|"FAIL", "reason": Optional[str],
"severity": Optional[str], "tags": Optional[List[str]] }, ...
],
"summary": { "fail_count": int, "total_rules": int }
}
"""
out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}}
engine = get_rule_engine()
if engine is None:
return out
try:
engine_results = engine.run_all(text, category=category) # list of dicts
# Normalize explicitly
index = 0
total = len(engine_results)
while index < total:
item = engine_results[index]
normalized = {
"name": item.get("name"),
"description": item.get("description"),
"category": item.get("category"),
"result": item.get("result"), # "PASS" | "FAIL"
"reason": item.get("reason"), # present on FAIL by engine design
"severity": item.get("severity"),
"tags": item.get("tags"),
}
out["checks"].append(normalized)
index = index + 1
out["summary"] = _summarize_results(out["checks"])
except Exception as exc:
# Preserve shape; record the error as a synthetic PASS (so UI doesn't break)
out["checks"].append({
"name": "engine_error",
"description": "Rule engine failed during evaluation",
"category": category,
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
})
out["summary"] = {"fail_count": 0, "total_rules": 1}
return out
def build_rule_checks_overview(full_html_text: str) -> List[Dict[str, Any]]:
"""
Build a top-level overview for the results page: runs each category across
the entire HTML and groups results by category.
Returns:
[
{"category": "script", "results": [ ...engine dicts... ], "summary": {...}},
{"category": "form", "results": [ ... ], "summary": {...}},
{"category": "text", "results": [ ... ], "summary": {...}},
]
"""
overview: List[Dict[str, Any]] = []
engine = get_rule_engine()
categories = ["script", "form", "text"]
index = 0
total = len(categories)
while index < total:
cat = categories[index]
block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}}
if engine is not None:
try:
results = engine.run_all(full_html_text, category=cat)
block["results"] = results
block["summary"] = _summarize_results(results)
except Exception as exc:
block["results"] = [{
"name": "engine_error",
"description": "Rule engine failed during overview evaluation",
"category": cat,
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
}]
block["summary"] = {"fail_count": 0, "total_rules": 1}
overview.append(block)
index = index + 1
return overview
# ---------------------------------------------------------------------------
# Form & Script analysis (plumbing only; detection is in the rules engine)
# ---------------------------------------------------------------------------
def analyze_forms(html: str, base_url: str) -> List[Dict[str, Any]]:
"""
Parse forms from the page HTML and apply rule-based checks (engine), keeping
only simple plumbing heuristics here (no security logic).
Returns list of dicts with keys:
- action, method, inputs
- flagged (bool), flag_reasons (list[str]), status (str)
- rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation)
Note:
The 'flagged' value is now purely a legacy visual hint based on simple
heuristics; the authoritative PASS/FAIL details are in rule_checks.
As you migrate heuristics into function rules, this 'flagged' may be
removed entirely.
"""
soup = BeautifulSoup(html, "lxml")
forms_info: List[Dict[str, Any]] = []
page_hostname = urlparse(base_url).hostname
for form in soup.find_all("form"):
action = form.get("action")
method = form.get("method", "get").lower()
# Build explicit inputs list
inputs: List[Dict[str, Any]] = []
for inp in form.find_all("input"):
input_name = inp.get("name")
input_type = inp.get("type", "text")
inputs.append({"name": input_name, "type": input_type})
# Minimal legacy flags (kept for UI continuity; detection lives in engine)
flagged_reasons: List[str] = []
if not action or str(action).strip() == "":
flagged_reasons.append("No action specified")
else:
try:
action_host = urlparse(action).hostname
if not str(action).startswith("/") and action_host != page_hostname:
flagged_reasons.append("Submits to a different host")
except Exception:
pass
try:
if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https":
flagged_reasons.append("Submits over insecure HTTP")
except Exception:
pass
for hidden in form.find_all("input", type="hidden"):
name_value = hidden.get("name") or ""
if "password" in name_value.lower():
flagged_reasons.append("Hidden password field")
flagged = bool(flagged_reasons)
# Serialize a simple form snippet for rule category='form'
snippet_lines = []
snippet_lines.append(f"base_url={base_url}")
snippet_lines.append(f"base_hostname={page_hostname}")
snippet_lines.append(f"action={action}")
snippet_lines.append(f"method={method}")
snippet_lines.append("inputs=")
i = 0
n = len(inputs)
while i < n:
item = inputs[i]
snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}")
i = i + 1
form_snippet = "\n".join(snippet_lines)
# Per-form rule checks (PASS/FAIL list via engine)
rule_checks = run_rule_checks(form_snippet, category="form")
forms_info.append({
"action": action,
"method": method,
"inputs": inputs,
"flagged": flagged,
"flag_reasons": flagged_reasons,
"status": "flagged" if flagged else "possibly safe",
"rule_checks": rule_checks
})
return forms_info
def analyze_scripts(html: str, base_url: str = "") -> List[Dict[str, Any]]:
"""
Collect script artifacts and evaluate per-script matches via the rules engine.
Only include rows that matched at least one rule. Inline scripts are checked
against regex rules using their text; external scripts are checked against
function rules using a small 'facts' dict (src/hosts).
Returns list of dicts like:
{
"type": "external" | "inline" | "unknown",
"src": "...", # for external
"content_snippet": "...", # for inline
"rules": [ { "name": "...", "description": "..." }, ... ]
}
"""
soup = BeautifulSoup(html, "lxml")
results: List[Dict[str, Any]] = []
# Benign MIME types we ignore entirely (non-detection plumbing)
benign_types = {"application/ld+json", "application/json"}
engine = get_rule_engine()
base_hostname = urlparse(base_url).hostname or ""
for script in soup.find_all("script"):
try:
src = (script.get("src") or "").strip()
s_type_attr = (script.get("type") or "").strip().lower()
inline_text = script.get_text(strip=True) or ""
# Skip benign structured data outright (noise control)
if s_type_attr in benign_types:
continue
record: Dict[str, Any] = {}
if src:
record["type"] = "external"
record["src"] = src
elif inline_text:
record["type"] = "inline"
record["content_snippet"] = (inline_text[:settings.ui.snippet_preview_len]).replace("\n", " ")
else:
record["type"] = "unknown"
# --- Per-script evaluation: gather matches from engine rules
matches: List[Dict[str, str]] = []
if engine is not None:
# Inline content → run regex script rules against the text
if inline_text:
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex":
ok, reason = r.run(inline_text)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": getattr(r, "description", "") or (reason or ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
# External src → run function script rules with facts
if src:
facts = {
"src": src,
"base_url": base_url,
"base_hostname": base_hostname,
"src_hostname": urlparse(src).hostname or "",
"category": "script",
}
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function":
ok, reason = r.run(facts)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": (reason or "") or getattr(r, "description", ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
# Only keep rows that matched at least one rule
if matches:
record["rules"] = matches
results.append(record)
except Exception as exc:
results.append({
"type": "unknown",
"heuristics": [f"Script analysis error: {exc}"]
})
return results
# ---------------------------------------------------------------------------
# Fetcher / Orchestrator
# ---------------------------------------------------------------------------
async def fetch_page_artifacts(url: str, storage_dir: Path) -> Dict[str, Any]:
"""
Fetch page artifacts and save them in a UUID-based directory.
Writes:
- /data/<uuid>/screenshot.png
- /data/<uuid>/source.txt
- /data/<uuid>/results.json (single source of truth for routes)
Returns:
result dict with keys used by templates (and future API).
"""
run_uuid = str(uuid.uuid4())
run_dir = storage_dir / run_uuid
run_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = run_dir / "screenshot.png"
source_path = run_dir / "source.txt"
results_path = run_dir / "results.json"
redirects: List[Dict[str, Any]] = []
downloads: List[Dict[str, Any]] = []
scripts_seen: List[str] = []
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"]
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
java_script_enabled=True,
locale="en-US"
)
page = await context.new_page()
# Event handlers (plumbing)
def _on_response(resp):
try:
if 300 <= resp.status <= 399:
redirects.append({"status": resp.status, "url": resp.url})
except Exception:
pass
def _on_download(d):
try:
downloads.append({"url": d.url, "suggested_filename": d.suggested_filename})
except Exception:
pass
def _on_request(r):
try:
if r.url.endswith((".js", ".vbs", ".hta")):
scripts_seen.append(r.url)
except Exception:
pass
page.on("response", _on_response)
page.on("download", _on_download)
page.on("request", _on_request)
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
final_url = page.url
await page.screenshot(path=str(screenshot_path), full_page=True)
html = await page.content()
safe_write(source_path, html)
except PWTimeoutError:
final_url = page.url
safe_write(source_path, "Page did not fully load (timeout)")
await page.screenshot(path=str(screenshot_path), full_page=True)
await context.close()
await browser.close()
# Read back saved source
html_content = source_path.read_text(encoding="utf-8")
# Forms analysis (per-form rule checks)
forms_info = analyze_forms(html_content, final_url)
# Scripts artifacts (no detection here)
suspicious_scripts = analyze_scripts(html_content, base_url=final_url)
# Enrichment
enrichment = enrich_url(url)
# Global PASS/FAIL table per category (entire document)
rule_checks_overview = build_rule_checks_overview(html_content)
for blk in rule_checks_overview:
current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}")
# Assemble single result dict
result: Dict[str, Any] = {
"uuid": run_uuid,
"submitted_url": url,
"final_url": final_url,
"redirects": redirects,
"downloads": downloads,
"scripts": scripts_seen,
"forms": forms_info,
"suspicious_scripts": suspicious_scripts,
"rule_checks": rule_checks_overview, # table-ready for UI
"enrichment": enrichment
}
# Persist as the single source of truth for routes
safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False))
try:
current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}")
except Exception:
pass
return result
def load_results(storage_dir: Path, run_uuid: str) -> Optional[Dict[str, Any]]:
"""
Load a prior run's results.json from /data/<uuid>/.
Returns:
dict or None
"""
run_dir = storage_dir / run_uuid
results_path = run_dir / "results.json"
if not results_path.exists():
return None
try:
text = results_path.read_text(encoding="utf-8")
data = json.loads(text)
return data
except Exception:
return None

View File

@@ -1,5 +0,0 @@
words:
- "reset password"
- "open document"
- "view document"
- "verify account"

View File

@@ -5,8 +5,9 @@ from pathlib import Path
from datetime import datetime
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, send_file, abort
from .browser import fetch_page_artifacts
from .enrichment import enrich_url
# from .browser import fetch_page_artifacts
from .utils.browser import get_browser
from .utils.enrichment import enrich_url
from .utils.settings import get_settings
from .utils.io_helpers import get_recent_results
@@ -64,9 +65,8 @@ def analyze():
storage.mkdir(parents=True, exist_ok=True)
try:
engine = current_app.config.get("RULE_ENGINE")
result = asyncio.run(fetch_page_artifacts(url, storage))
# result = asyncio.run(fetch_page_artifacts(url, storage))
browser = get_browser()
result = asyncio.run(browser.fetch_page_artifacts(url))
current_app.logger.info(f"[+] Analysis done for {url}")
except Exception as e:
flash(f"Analysis failed: {e}", "error")

522
app/utils/browser.py Normal file
View File

@@ -0,0 +1,522 @@
"""
app/browser.py
Singleton, lazily-loaded page fetcher + analysis orchestrator for SneakyScope.
Responsibilities:
- Fetch a URL (HTML, redirects, etc.)
- Run the Suspicious Rules Engine (PASS/FAIL for all rules)
- Write artifacts (screenshot.png, source.txt, results.json) into /data/<run_uuid>/
- Return a single 'result' dict suitable for UI and future API
Design notes:
- Detection logic (regex/heuristics) lives in the rules engine (YAML/function rules).
- This module keeps "plumbing" only (fetch, extract, persist).
- Minimal non-detection heuristics remain here (e.g., skip benign script MIME types).
Assumptions:
- Flask app context is active (uses current_app for logger and RULE_ENGINE).
- SANDBOX_STORAGE is configured (default: /data).
- enrich_url(url) returns enrichment dict.
"""
from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from flask import current_app
from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError
from app.utils.io_helpers import safe_write
from app.enrichment import enrich_url
from app.utils.settings import get_settings
# Load settings once for constants / defaults
settings = get_settings()
class Browser:
"""
Orchestrates page fetching and analysis. Meant to be accessed via the
lazily-loaded singleton factory `get_browser()`.
"""
def __init__(self, storage_dir: Optional[Path] = None) -> None:
"""
Args:
storage_dir: Base directory for run artifacts. Defaults to settings.sandbox.storage
(typically /data) if not provided.
"""
if storage_dir is None:
try:
# Prefer your settings models configured storage path
storage_dir = Path(settings.sandbox.storage)
except Exception:
storage_dir = Path("/data")
self.storage_dir: Path = storage_dir
# -----------------------------------------------------------------------
# Engine access helpers
# -----------------------------------------------------------------------
@staticmethod
def _get_rule_engine():
"""
Retrieve the rules engine instance from the Flask application config.
Returns:
RuleEngine or None: The engine if available, or None if not configured.
"""
try:
return current_app.config.get("RULE_ENGINE")
except Exception:
return None
@staticmethod
def _summarize_results(results: List[Dict[str, Any]]) -> Dict[str, int]:
"""
Summarize a list of engine rule result dicts (result = "PASS"|"FAIL").
Returns:
{'fail_count': int, 'total_rules': int}
"""
summary = {"fail_count": 0, "total_rules": 0}
index = 0
total = len(results)
while index < total:
item = results[index]
summary["total_rules"] = summary["total_rules"] + 1
if str(item.get("result", "")).upper() == "FAIL":
summary["fail_count"] = summary["fail_count"] + 1
index = index + 1
return summary
def run_rule_checks(self, text: str, category: str) -> Dict[str, Any]:
"""
Run all rules for a given category against provided text, returning a table-friendly model.
Args:
text: Text to analyze (HTML, snippet, etc.)
category: One of 'form', 'script', 'text' (or any category your rules use)
Returns:
{
"checks": [
{ "name": str, "description": str, "category": str,
"result": "PASS"|"FAIL", "reason": Optional[str],
"severity": Optional[str], "tags": Optional[List[str]] }, ...
],
"summary": { "fail_count": int, "total_rules": int }
}
"""
out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}}
engine = self._get_rule_engine()
if engine is None:
return out
try:
engine_results = engine.run_all(text, category=category) # list of dicts
index = 0
total = len(engine_results)
while index < total:
item = engine_results[index]
normalized = {
"name": item.get("name"),
"description": item.get("description"),
"category": item.get("category"),
"result": item.get("result"), # "PASS" | "FAIL"
"reason": item.get("reason"), # present on FAIL by engine design
"severity": item.get("severity"),
"tags": item.get("tags"),
}
out["checks"].append(normalized)
index = index + 1
out["summary"] = self._summarize_results(out["checks"])
except Exception as exc:
# Preserve shape; record the error as a synthetic PASS (so UI doesn't break)
out["checks"].append({
"name": "engine_error",
"description": "Rule engine failed during evaluation",
"category": category,
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
})
out["summary"] = {"fail_count": 0, "total_rules": 1}
return out
def build_rule_checks_overview(self, full_html_text: str) -> List[Dict[str, Any]]:
"""
Build a top-level overview for the results page: runs each category across
the entire HTML and groups results by category.
Returns:
[
{"category": "script", "results": [ ...engine dicts... ], "summary": {...}},
{"category": "form", "results": [ ... ], "summary": {...}},
{"category": "text", "results": [ ... ], "summary": {...}},
]
"""
overview: List[Dict[str, Any]] = []
engine = self._get_rule_engine()
categories = ["script", "form", "text"]
index = 0
total = len(categories)
while index < total:
cat = categories[index]
block = {"category": cat, "results": [], "summary": {"fail_count": 0, "total_rules": 0}}
if engine is not None:
try:
results = engine.run_all(full_html_text, category=cat)
block["results"] = results
block["summary"] = self._summarize_results(results)
except Exception as exc:
block["results"] = [{
"name": "engine_error",
"description": "Rule engine failed during overview evaluation",
"category": cat,
"result": "PASS",
"reason": f"{exc}",
"severity": None,
"tags": None
}]
block["summary"] = {"fail_count": 0, "total_rules": 1}
overview.append(block)
index = index + 1
return overview
# -----------------------------------------------------------------------
# Form & Script analysis (plumbing only; detection is in the rules engine)
# -----------------------------------------------------------------------
def analyze_forms(self, html: str, base_url: str) -> List[Dict[str, Any]]:
"""
Parse forms from the page HTML and apply rule-based checks (engine), keeping
only simple plumbing heuristics here (no security logic).
Returns list of dicts with keys:
- action, method, inputs
- flagged (bool), flag_reasons (list[str]), status (str)
- rule_checks: {'checks': [...], 'summary': {...}} (per-form snippet evaluation)
"""
soup = BeautifulSoup(html, "lxml")
forms_info: List[Dict[str, Any]] = []
page_hostname = urlparse(base_url).hostname
for form in soup.find_all("form"):
action = form.get("action")
method = form.get("method", "get").lower()
inputs: List[Dict[str, Any]] = []
for inp in form.find_all("input"):
input_name = inp.get("name")
input_type = inp.get("type", "text")
inputs.append({"name": input_name, "type": input_type})
flagged_reasons: List[str] = []
if not action or str(action).strip() == "":
flagged_reasons.append("No action specified")
else:
try:
action_host = urlparse(action).hostname
if not str(action).startswith("/") and action_host != page_hostname:
flagged_reasons.append("Submits to a different host")
except Exception:
pass
try:
if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https":
flagged_reasons.append("Submits over insecure HTTP")
except Exception:
pass
for hidden in form.find_all("input", type="hidden"):
name_value = hidden.get("name") or ""
if "password" in name_value.lower():
flagged_reasons.append("Hidden password field")
flagged = bool(flagged_reasons)
# Serialize a simple form snippet for rule category='form'
snippet_lines = []
snippet_lines.append(f"base_url={base_url}")
snippet_lines.append(f"base_hostname={page_hostname}")
snippet_lines.append(f"action={action}")
snippet_lines.append(f"method={method}")
snippet_lines.append("inputs=")
i = 0
n = len(inputs)
while i < n:
item = inputs[i]
snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}")
i = i + 1
form_snippet = "\n".join(snippet_lines)
# Per-form rule checks (PASS/FAIL list via engine)
rule_checks = self.run_rule_checks(form_snippet, category="form")
forms_info.append({
"action": action,
"method": method,
"inputs": inputs,
"flagged": flagged,
"flag_reasons": flagged_reasons,
"status": "flagged" if flagged else "possibly safe",
"rule_checks": rule_checks
})
return forms_info
def analyze_scripts(self, html: str, base_url: str = "") -> List[Dict[str, Any]]:
"""
Collect script artifacts and evaluate per-script matches via the rules engine.
Only include rows that matched at least one rule.
"""
soup = BeautifulSoup(html, "lxml")
results: List[Dict[str, Any]] = []
benign_types = {"application/ld+json", "application/json"}
engine = self._get_rule_engine()
base_hostname = urlparse(base_url).hostname or ""
for script in soup.find_all("script"):
try:
src = (script.get("src") or "").strip()
s_type_attr = (script.get("type") or "").strip().lower()
inline_text = script.get_text(strip=True) or ""
if s_type_attr in benign_types:
continue
record: Dict[str, Any] = {}
if src:
record["type"] = "external"
record["src"] = src
elif inline_text:
# respect your UI snippet config
preview_len = getattr(settings.ui, "snippet_preview_len", 200)
record["type"] = "inline"
record["content_snippet"] = (inline_text[:preview_len]).replace("\n", " ")
else:
record["type"] = "unknown"
matches: List[Dict[str, Any]] = []
if engine is not None:
if inline_text:
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "regex":
ok, reason = r.run(inline_text)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": getattr(r, "description", "") or (reason or ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
if src:
facts = {
"src": src,
"base_url": base_url,
"base_hostname": base_hostname,
"src_hostname": urlparse(src).hostname or "",
"category": "script",
}
for r in engine.rules:
if getattr(r, "category", None) == "script" and getattr(r, "rule_type", None) == "function":
ok, reason = r.run(facts)
if ok:
matches.append({
"name": getattr(r, "name", "unknown_rule"),
"description": (reason or "") or getattr(r, "description", ""),
"severity": getattr(r, "severity", None),
"tags": getattr(r, "tags", None),
})
if matches:
record["rules"] = matches
results.append(record)
except Exception as exc:
results.append({
"type": "unknown",
"heuristics": [f"Script analysis error: {exc}"]
})
return results
# -----------------------------------------------------------------------
# Fetcher / Orchestrator
# -----------------------------------------------------------------------
async def fetch_page_artifacts(self, url: str) -> Dict[str, Any]:
"""
Fetch page artifacts and save them in a UUID-based directory for this Browser's storage_dir.
Writes:
- /data/<uuid>/screenshot.png
- /data/<uuid>/source.txt
- /data/<uuid>/results.json (single source of truth for routes)
Returns:
result dict with keys used by templates (and future API).
"""
run_uuid = str(uuid.uuid4())
run_dir = self.storage_dir / run_uuid
run_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = run_dir / "screenshot.png"
source_path = run_dir / "source.txt"
results_path = run_dir / "results.json"
redirects: List[Dict[str, Any]] = []
downloads: List[Dict[str, Any]] = []
scripts_seen: List[str] = []
async with async_playwright() as pw:
browser = await pw.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled"]
)
context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
java_script_enabled=True,
locale="en-US"
)
page = await context.new_page()
# Event handlers (plumbing)
def _on_response(resp):
try:
if 300 <= resp.status <= 399:
redirects.append({"status": resp.status, "url": resp.url})
except Exception:
pass
def _on_download(d):
try:
downloads.append({"url": d.url, "suggested_filename": d.suggested_filename})
except Exception:
pass
def _on_request(r):
try:
if r.url.endswith((".js", ".vbs", ".hta")):
scripts_seen.append(r.url)
except Exception:
pass
page.on("response", _on_response)
page.on("download", _on_download)
page.on("request", _on_request)
try:
await page.goto(url, wait_until="networkidle", timeout=60000)
final_url = page.url
await page.screenshot(path=str(screenshot_path), full_page=True)
html = await page.content()
safe_write(source_path, html)
except PWTimeoutError:
final_url = page.url
safe_write(source_path, "Page did not fully load (timeout)")
await page.screenshot(path=str(screenshot_path), full_page=True)
await context.close()
await browser.close()
# Read back saved source
html_content = source_path.read_text(encoding="utf-8")
# Forms analysis (per-form rule checks)
forms_info = self.analyze_forms(html_content, final_url)
# Scripts artifacts (no detection here)
suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url)
# Enrichment
enrichment = enrich_url(url)
# Global PASS/FAIL table per category (entire document)
rule_checks_overview = self.build_rule_checks_overview(html_content)
try:
for blk in rule_checks_overview:
current_app.logger.debug(f"[rules] {blk['category']}: {blk['summary']}")
except Exception:
pass
# Assemble single result dict
result: Dict[str, Any] = {
"uuid": run_uuid,
"submitted_url": url,
"final_url": final_url,
"redirects": redirects,
"downloads": downloads,
"scripts": scripts_seen,
"forms": forms_info,
"suspicious_scripts": suspicious_scripts,
"rule_checks": rule_checks_overview, # table-ready for UI
"enrichment": enrichment
}
# Persist as the single source of truth for routes
safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False))
try:
current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}")
except Exception:
pass
return result
# ---------------------------------------------------------------------------
# Lazy-loaded singleton factory
# ---------------------------------------------------------------------------
# Prefer importing your project-wide singleton decorator.
try:
from app.utils.settings import singleton_loader # if we already export it
except Exception:
# Local fallback if import is not available.
from typing import Callable, TypeVar
import functools
T = TypeVar("T")
def singleton_loader(func: Callable[..., T]) -> Callable[..., T]:
"""Ensure the function only runs once, returning the cached value."""
cache: dict[str, T] = {}
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
if func.__name__ not in cache:
cache[func.__name__] = func(*args, **kwargs)
return cache[func.__name__]
return wrapper
@singleton_loader
def get_browser(storage_dir: Optional[Path] = None) -> Browser:
"""
Lazily construct and cache a singleton Browser instance.
Args:
storage_dir: Optional override for artifact base directory.
Returns:
Browser: The singleton instance.
"""
return Browser(storage_dir=storage_dir)

View File

@@ -19,14 +19,6 @@ logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
cache = get_cache("/data/cache.db")
settings = get_settings()
# Load BEC words
BEC_WORDS_FILE = Path(__file__).parent.parent / "config" / "bec_words.yaml"
if BEC_WORDS_FILE.exists():
with open(BEC_WORDS_FILE, "r", encoding="utf-8") as f:
BEC_WORDS = yaml.safe_load(f).get("words", [])
else:
BEC_WORDS = []
# 24 hours * 60 minutes
days = 24 * 60