Files
SneakyScope/app/rules/function_rules.py
Phillip Tarrant 693f7d67b9 feat: HTTPS auto-normalization; robust TLS intel UI; global rules state; clean logging; preload
- Add SSL/TLS intelligence pipeline:
  - crt.sh lookup with expired-filtering and root-domain wildcard resolution
  - live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data

- Introduce `URLNormalizer` with punycode support and typo repair
  - Auto-prepend `https://` for bare domains (e.g., `google.com`)
  - Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
  - `get_url_normalizer()` reads defaults from Settings (if present)

- Standardize function-rule return shape to `(bool, dict|None)` across
  `form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
  - Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
  - Adapt non-dict inputs to facts (category-aware and via provided adapter)
  - Return `(True, dict)` on match, `(False, None)` on miss
  - Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
  - Back rules by private `self._rules`; `rules` property returns copy
  - Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
  - Fix AttributeError from property assignment during `__init__`

- Replace hidden singleton factory with explicit builder + global state:
  - `app/rules/factory.py::build_rules_engine()` builds and logs totals
  - `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
  - `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts

- Unify logging wiring:
  - `wire_logging_once(app)` clears and attaches a single handler chain
  - Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
  - Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)

- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)

- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section

- Consumers should import the rules engine via:
  - `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
  then publish with `set_rules_engine()`. Do not call old singleton factories.

- New/changed modules (high level):
  - `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
  - `app/rules/function_rules.py` (±) — normalized payload returns
  - `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
  - `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
  - `app/rules/factory.py` (±) — pure builder; totals logged post-registration
  - `app/state.py` (+) — process-global rules engine
  - `app/logging_setup.py` (±) — single chain, two named loggers
  - `app/wsgi.py` (±) — preload build + `set_rules_engine()`
  - `entrypoint.sh` (±) — add `--preload`
  - templates (±) — TLS card, raw toggle; front-page checkbox

Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
2025-08-21 22:05:16 -05:00

369 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
app/rules/function_rules.py
Class-based adapters + function-based rules for SneakyScope.
Design:
- FactAdapter: converts text snippets into structured 'facts' dicts by category.
- FunctionRuleAdapter: wraps a rule function (expects dict facts) so it can be
used directly by the RuleEngine even when the engine is given strings.
Each rule returns (matched: bool, reason: Optional[str]).
If matched is True, 'reason' should explain why.
Note:
- Form rules work today with text snippets, thanks to FunctionRuleAdapter+FactAdapter.
- Script rules expect per-script dict facts (src/base_hostname/etc.). They are
registered now and will fully activate when you evaluate per-script contexts.
"""
from __future__ import annotations
from typing import Any, Callable, Dict, Optional, Tuple
import inspect
import logging
from urllib.parse import urlparse
from app.logging_setup import get_app_logger
app_logger = get_app_logger()
_NOOP_ACTIONS = {"", "#", "javascript:void(0)", "javascript:", "about:blank"}
# ---------------------------------------------------------------------------
# Adapters
# ---------------------------------------------------------------------------
class FactAdapter:
"""
Converts raw text/html snippets into structured 'facts' suitable for
function-based rules. If input is already a dict, returns it unchanged.
You can expand the per-category parsers over time as needed.
"""
def adapt(self, text_or_facts: Any, category: str = "") -> Dict[str, Any]:
"""
Adapt text_or_facts (str or dict) into a facts dict.
Args:
text_or_facts: Either raw string snippet or an already-structured dict.
category: 'form' | 'script' | 'text' | ... (used to choose parser)
"""
# Already structured — pass through
if isinstance(text_or_facts, dict):
# Ensure a category key for consistency (optional)
text_or_facts.setdefault("category", category or text_or_facts.get("category") or "")
return text_or_facts
# String snippets are parsed by category
if isinstance(text_or_facts, str):
if category == "form":
return self._adapt_form_snippet(text_or_facts)
elif category == "script":
# For now, we don't parse script snippets into facts. Script rules expect
# per-script dicts (src/base_hostname/etc.), which you'll provide when you
# add per-script evaluation. Return minimal facts for safety.
return {"category": "script", "raw": text_or_facts}
elif category == "text":
return {"category": "text", "raw": text_or_facts}
else:
app_logger.warning(f"[FactAdapter] Unknown category '{category}', returning raw snippet.")
return {"category": category, "raw": text_or_facts}
# Fallback for unrecognized input types
app_logger.warning(f"[FactAdapter] Unsupported input type: {type(text_or_facts)!r}")
return {"category": category, "raw": text_or_facts}
# ---- Per-category parsers ----
def _adapt_form_snippet(self, snippet: str) -> Dict[str, Any]:
"""
Parse the simple form snippet format used by browser.py today, e.g.:
action=https://example.com/post
method=post
inputs=
- name=email type=text
- name=password type=password
Only extracts fields needed by current function rules.
"""
facts: Dict[str, Any] = {"category": "form", "raw": snippet}
lines = snippet.splitlines()
i = 0
n = len(lines)
while i < n:
line = (lines[i] or "").strip()
if line.startswith("action="):
facts["action"] = line.split("=", 1)[1].strip()
elif line.startswith("method="):
facts["method"] = line.split("=", 1)[1].strip()
i = i + 1
# Normalize context keys expected by form rules
facts.setdefault("base_url", "") # filled by caller later if desired
facts.setdefault("base_hostname", "") # filled by caller later if desired
return facts
class FunctionRuleAdapter:
"""
Wraps a function-based rule so it ALWAYS returns:
- match: (True, Dict[str, Any])
- no match: (False, None)
Also adapts non-dict inputs into facts via a provided 'adapter' using a
duck-typed protocol, so callers can pass raw items (e.g., strings/nodes).
"""
def __init__(
self,
fn: Callable[[Dict[str, Any]], Any],
category: str,
adapter: Optional[Any] = None,
rule_name: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
self.fn = fn
self.category = category
self.adapter = adapter
self.rule_name = rule_name or getattr(fn, "__name__", "<anonymous>")
# ---------- helpers ----------
def _adapt_to_facts(self, raw: Any) -> Optional[Dict[str, Any]]:
"""
Convert whatever the engine passed into a facts dict.
Tries the provided adapter using a duck-typed protocol.
Returns a dict, or None if we can't adapt.
"""
# Already a dict? Use it.
if isinstance(raw, dict):
return raw
# Try adapter if provided
if self.adapter is not None:
# Preferred generic signatures
for meth in ("build_facts", "facts", "to_facts"):
fn = getattr(self.adapter, meth, None)
if callable(fn):
try:
facts = fn(self.category, raw)
if isinstance(facts, dict):
return facts
except Exception as exc:
app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc)
# Category-specific fallbacks: build_<category>_facts / <category>_facts
cands = (f"build_{self.category}_facts", f"{self.category}_facts")
for meth in cands:
fn = getattr(self.adapter, meth, None)
if callable(fn):
try:
facts = fn(raw)
if isinstance(facts, dict):
return facts
except Exception as exc:
app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc)
# No way to adapt
return None
def _coerce_return(self, outcome: Any) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Normalize rule function returns:
accepted:
(bool, dict|None)
(bool, str) -> dict {'note': str} on match
(bool,) or bool -> (bool, None)
On invalid shapes, treat as no-match.
"""
# Exact 2-tuple
if isinstance(outcome, tuple) and len(outcome) == 2:
matched = bool(outcome[0])
raw = outcome[1]
if not matched:
return False, None
if raw is None:
return True, {} # match with empty payload is fine
if isinstance(raw, dict):
return True, raw
if isinstance(raw, str):
return True, {"note": raw}
app_logger.warning("[Rule] '%s' returned payload of invalid type: %s",
self.rule_name, type(raw).__name__)
# Still treat as match but give minimal payload
return True, {"note": "coerced-invalid-payload", "value_repr": repr(raw)}
# Legacy: (bool,) or bare bool
if isinstance(outcome, tuple) and len(outcome) == 1 and isinstance(outcome[0], bool):
return (True, {}) if outcome[0] else (False, None)
if isinstance(outcome, bool):
return (True, {}) if outcome else (False, None)
# Junk -> no match
app_logger.warning("[Rule] '%s' returned invalid shape: %s",
self.rule_name, type(outcome).__name__)
return False, None
# ---------- callable ----------
def __call__(self, raw: Any) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Apply the wrapped rule to the provided item (raw or facts).
Returns:
(True, dict) on match
(False, None) on no match
"""
facts = self._adapt_to_facts(raw)
if facts is None:
app_logger.warning("[Rule] '%s' received non-dict facts (%s). Coercing to miss.",
self.rule_name, type(raw).__name__)
return False, None
try:
outcome = self.fn(facts)
except Exception as exc:
app_logger.exception("[Rule] '%s' raised: %s", self.rule_name, exc)
return False, None
matched, payload = self._coerce_return(outcome)
return matched, payload
def _hit(payload: Optional[Dict[str, Any]] = None) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Standardize a positive match result: (True, dict)
"""
if payload is None:
payload = {}
return True, payload
def _miss() -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Standardize a negative match result: (False, None)
"""
return False, None
# ---------------------------------------------------------------------------
# Function-based rules (dict 'facts' expected)
# ---------------------------------------------------------------------------
# ---------------- Script rules ----------------
def script_src_uses_data_or_blob(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags <script> tags with src='data:' or 'blob:'."""
src = facts.get("src") or ""
if isinstance(src, str) and src.startswith(("data:", "blob:")):
scheme = src.split(":", 1)[0]
return _hit({
"scheme": scheme,
"src": src,
"note": f"Script src uses {scheme}: URL"
})
return _miss()
def script_src_has_dangerous_extension(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags <script> tags with dangerous file extensions (e.g., .vbs, .hta)."""
src = facts.get("src") or ""
if not isinstance(src, str):
return _miss()
low = src.lower()
dangerous = (".vbs", ".hta")
i = 0
m = len(dangerous)
while i < m:
ext = dangerous[i]
if low.endswith(ext):
return _hit({
"ext": ext,
"src": src,
"note": f"External script has dangerous extension ({ext})"
})
i = i + 1
return _miss()
def script_third_party_host(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags scripts loaded from a different hostname than the page."""
base_host = facts.get("base_hostname") or ""
src_host = facts.get("src_hostname") or ""
if base_host and src_host and base_host != src_host:
return _hit({
"base_host": base_host,
"src_host": src_host,
"note": f"Third-party script host: {src_host}"
})
return _miss()
# ---------------- Form rules ----------------
def form_action_missing(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags <form> elements with no meaningful action attribute."""
action = (facts.get("action") or "").strip()
if action in _NOOP_ACTIONS:
return _hit({
"action": action,
"note": "Form has no action attribute (or uses a no-op action)"
})
return _miss()
def form_http_on_https_page(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags forms submitting over HTTP while the page was loaded over HTTPS."""
base_url = (facts.get("base_url") or "").strip()
action = (facts.get("action") or "").strip()
try:
base_scheme = (urlparse(base_url).scheme or "").lower()
parsed_act = urlparse(action)
act_scheme = (parsed_act.scheme or "").lower()
except Exception:
return _miss() # parsing trouble → dont flag
# Only flag absolute http:// actions on https pages.
if base_scheme == "https" and act_scheme == "http":
return _hit({
"base_url": base_url,
"action": parsed_act.geturl(),
"note": "Submits over insecure HTTP"
})
return _miss()
def form_submits_to_different_host(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""Flags <form> actions that submit to a different hostname than the page."""
base_host = (facts.get("base_hostname") or "").strip().lower()
action = (facts.get("action") or "").strip()
if not action or action in _NOOP_ACTIONS:
return _miss()
try:
parsed = urlparse(action)
act_host = (parsed.hostname or "").lower()
except Exception:
return _miss()
# Only compare when the action specifies a host (absolute URL or schemeless //host/path).
if act_host and base_host and act_host != base_host:
return _hit({
"base_host": base_host,
"act_host": act_host,
"action": action,
"note": "Submits to a different host"
})
return _miss()