- Add SSL/TLS intelligence pipeline:
- crt.sh lookup with expired-filtering and root-domain wildcard resolution
- live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data
- Introduce `URLNormalizer` with punycode support and typo repair
- Auto-prepend `https://` for bare domains (e.g., `google.com`)
- Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
- `get_url_normalizer()` reads defaults from Settings (if present)
- Standardize function-rule return shape to `(bool, dict|None)` across
`form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
- Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
- Adapt non-dict inputs to facts (category-aware and via provided adapter)
- Return `(True, dict)` on match, `(False, None)` on miss
- Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
- Back rules by private `self._rules`; `rules` property returns copy
- Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
- Fix AttributeError from property assignment during `__init__`
- Replace hidden singleton factory with explicit builder + global state:
- `app/rules/factory.py::build_rules_engine()` builds and logs totals
- `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
- `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts
- Unify logging wiring:
- `wire_logging_once(app)` clears and attaches a single handler chain
- Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
- Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)
- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)
- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section
- Consumers should import the rules engine via:
- `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
then publish with `set_rules_engine()`. Do not call old singleton factories.
- New/changed modules (high level):
- `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
- `app/rules/function_rules.py` (±) — normalized payload returns
- `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
- `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
- `app/rules/factory.py` (±) — pure builder; totals logged post-registration
- `app/state.py` (+) — process-global rules engine
- `app/logging_setup.py` (±) — single chain, two named loggers
- `app/wsgi.py` (±) — preload build + `set_rules_engine()`
- `entrypoint.sh` (±) — add `--preload`
- templates (±) — TLS card, raw toggle; front-page checkbox
Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
369 lines
13 KiB
Python
369 lines
13 KiB
Python
"""
|
||
app/rules/function_rules.py
|
||
|
||
Class-based adapters + function-based rules for SneakyScope.
|
||
|
||
Design:
|
||
- FactAdapter: converts text snippets into structured 'facts' dicts by category.
|
||
- FunctionRuleAdapter: wraps a rule function (expects dict facts) so it can be
|
||
used directly by the RuleEngine even when the engine is given strings.
|
||
|
||
Each rule returns (matched: bool, reason: Optional[str]).
|
||
If matched is True, 'reason' should explain why.
|
||
|
||
Note:
|
||
- Form rules work today with text snippets, thanks to FunctionRuleAdapter+FactAdapter.
|
||
- Script rules expect per-script dict facts (src/base_hostname/etc.). They are
|
||
registered now and will fully activate when you evaluate per-script contexts.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, Callable, Dict, Optional, Tuple
|
||
import inspect
|
||
import logging
|
||
from urllib.parse import urlparse
|
||
|
||
from app.logging_setup import get_app_logger
|
||
|
||
app_logger = get_app_logger()
|
||
|
||
_NOOP_ACTIONS = {"", "#", "javascript:void(0)", "javascript:", "about:blank"}
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Adapters
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class FactAdapter:
|
||
"""
|
||
Converts raw text/html snippets into structured 'facts' suitable for
|
||
function-based rules. If input is already a dict, returns it unchanged.
|
||
|
||
You can expand the per-category parsers over time as needed.
|
||
"""
|
||
|
||
def adapt(self, text_or_facts: Any, category: str = "") -> Dict[str, Any]:
|
||
"""
|
||
Adapt text_or_facts (str or dict) into a facts dict.
|
||
|
||
Args:
|
||
text_or_facts: Either raw string snippet or an already-structured dict.
|
||
category: 'form' | 'script' | 'text' | ... (used to choose parser)
|
||
"""
|
||
# Already structured — pass through
|
||
if isinstance(text_or_facts, dict):
|
||
# Ensure a category key for consistency (optional)
|
||
text_or_facts.setdefault("category", category or text_or_facts.get("category") or "")
|
||
return text_or_facts
|
||
|
||
# String snippets are parsed by category
|
||
if isinstance(text_or_facts, str):
|
||
if category == "form":
|
||
return self._adapt_form_snippet(text_or_facts)
|
||
elif category == "script":
|
||
# For now, we don't parse script snippets into facts. Script rules expect
|
||
# per-script dicts (src/base_hostname/etc.), which you'll provide when you
|
||
# add per-script evaluation. Return minimal facts for safety.
|
||
return {"category": "script", "raw": text_or_facts}
|
||
elif category == "text":
|
||
return {"category": "text", "raw": text_or_facts}
|
||
else:
|
||
app_logger.warning(f"[FactAdapter] Unknown category '{category}', returning raw snippet.")
|
||
return {"category": category, "raw": text_or_facts}
|
||
|
||
# Fallback for unrecognized input types
|
||
app_logger.warning(f"[FactAdapter] Unsupported input type: {type(text_or_facts)!r}")
|
||
return {"category": category, "raw": text_or_facts}
|
||
|
||
# ---- Per-category parsers ----
|
||
|
||
def _adapt_form_snippet(self, snippet: str) -> Dict[str, Any]:
|
||
"""
|
||
Parse the simple form snippet format used by browser.py today, e.g.:
|
||
|
||
action=https://example.com/post
|
||
method=post
|
||
inputs=
|
||
- name=email type=text
|
||
- name=password type=password
|
||
|
||
Only extracts fields needed by current function rules.
|
||
"""
|
||
facts: Dict[str, Any] = {"category": "form", "raw": snippet}
|
||
|
||
lines = snippet.splitlines()
|
||
i = 0
|
||
n = len(lines)
|
||
while i < n:
|
||
line = (lines[i] or "").strip()
|
||
if line.startswith("action="):
|
||
facts["action"] = line.split("=", 1)[1].strip()
|
||
elif line.startswith("method="):
|
||
facts["method"] = line.split("=", 1)[1].strip()
|
||
i = i + 1
|
||
|
||
# Normalize context keys expected by form rules
|
||
facts.setdefault("base_url", "") # filled by caller later if desired
|
||
facts.setdefault("base_hostname", "") # filled by caller later if desired
|
||
return facts
|
||
|
||
|
||
class FunctionRuleAdapter:
|
||
"""
|
||
Wraps a function-based rule so it ALWAYS returns:
|
||
- match: (True, Dict[str, Any])
|
||
- no match: (False, None)
|
||
|
||
Also adapts non-dict inputs into facts via a provided 'adapter' using a
|
||
duck-typed protocol, so callers can pass raw items (e.g., strings/nodes).
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
fn: Callable[[Dict[str, Any]], Any],
|
||
category: str,
|
||
adapter: Optional[Any] = None,
|
||
rule_name: Optional[str] = None,
|
||
logger: Optional[logging.Logger] = None,
|
||
):
|
||
self.fn = fn
|
||
self.category = category
|
||
self.adapter = adapter
|
||
self.rule_name = rule_name or getattr(fn, "__name__", "<anonymous>")
|
||
|
||
|
||
# ---------- helpers ----------
|
||
|
||
def _adapt_to_facts(self, raw: Any) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Convert whatever the engine passed into a facts dict.
|
||
Tries the provided adapter using a duck-typed protocol.
|
||
Returns a dict, or None if we can't adapt.
|
||
"""
|
||
# Already a dict? Use it.
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
|
||
# Try adapter if provided
|
||
if self.adapter is not None:
|
||
# Preferred generic signatures
|
||
for meth in ("build_facts", "facts", "to_facts"):
|
||
fn = getattr(self.adapter, meth, None)
|
||
if callable(fn):
|
||
try:
|
||
facts = fn(self.category, raw)
|
||
if isinstance(facts, dict):
|
||
return facts
|
||
except Exception as exc:
|
||
app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc)
|
||
|
||
# Category-specific fallbacks: build_<category>_facts / <category>_facts
|
||
cands = (f"build_{self.category}_facts", f"{self.category}_facts")
|
||
for meth in cands:
|
||
fn = getattr(self.adapter, meth, None)
|
||
if callable(fn):
|
||
try:
|
||
facts = fn(raw)
|
||
if isinstance(facts, dict):
|
||
return facts
|
||
except Exception as exc:
|
||
app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc)
|
||
|
||
# No way to adapt
|
||
return None
|
||
|
||
def _coerce_return(self, outcome: Any) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""
|
||
Normalize rule function returns:
|
||
|
||
accepted:
|
||
(bool, dict|None)
|
||
(bool, str) -> dict {'note': str} on match
|
||
(bool,) or bool -> (bool, None)
|
||
|
||
On invalid shapes, treat as no-match.
|
||
"""
|
||
# Exact 2-tuple
|
||
if isinstance(outcome, tuple) and len(outcome) == 2:
|
||
matched = bool(outcome[0])
|
||
raw = outcome[1]
|
||
|
||
if not matched:
|
||
return False, None
|
||
|
||
if raw is None:
|
||
return True, {} # match with empty payload is fine
|
||
if isinstance(raw, dict):
|
||
return True, raw
|
||
if isinstance(raw, str):
|
||
return True, {"note": raw}
|
||
|
||
app_logger.warning("[Rule] '%s' returned payload of invalid type: %s",
|
||
self.rule_name, type(raw).__name__)
|
||
# Still treat as match but give minimal payload
|
||
return True, {"note": "coerced-invalid-payload", "value_repr": repr(raw)}
|
||
|
||
# Legacy: (bool,) or bare bool
|
||
if isinstance(outcome, tuple) and len(outcome) == 1 and isinstance(outcome[0], bool):
|
||
return (True, {}) if outcome[0] else (False, None)
|
||
if isinstance(outcome, bool):
|
||
return (True, {}) if outcome else (False, None)
|
||
|
||
# Junk -> no match
|
||
app_logger.warning("[Rule] '%s' returned invalid shape: %s",
|
||
self.rule_name, type(outcome).__name__)
|
||
return False, None
|
||
|
||
# ---------- callable ----------
|
||
|
||
def __call__(self, raw: Any) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""
|
||
Apply the wrapped rule to the provided item (raw or facts).
|
||
Returns:
|
||
(True, dict) on match
|
||
(False, None) on no match
|
||
"""
|
||
facts = self._adapt_to_facts(raw)
|
||
if facts is None:
|
||
app_logger.warning("[Rule] '%s' received non-dict facts (%s). Coercing to miss.",
|
||
self.rule_name, type(raw).__name__)
|
||
return False, None
|
||
|
||
try:
|
||
outcome = self.fn(facts)
|
||
except Exception as exc:
|
||
app_logger.exception("[Rule] '%s' raised: %s", self.rule_name, exc)
|
||
return False, None
|
||
|
||
matched, payload = self._coerce_return(outcome)
|
||
return matched, payload
|
||
|
||
|
||
def _hit(payload: Optional[Dict[str, Any]] = None) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""
|
||
Standardize a positive match result: (True, dict)
|
||
"""
|
||
if payload is None:
|
||
payload = {}
|
||
return True, payload
|
||
|
||
|
||
def _miss() -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""
|
||
Standardize a negative match result: (False, None)
|
||
"""
|
||
return False, None
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Function-based rules (dict 'facts' expected)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# ---------------- Script rules ----------------
|
||
|
||
def script_src_uses_data_or_blob(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags <script> tags with src='data:' or 'blob:'."""
|
||
src = facts.get("src") or ""
|
||
if isinstance(src, str) and src.startswith(("data:", "blob:")):
|
||
scheme = src.split(":", 1)[0]
|
||
return _hit({
|
||
"scheme": scheme,
|
||
"src": src,
|
||
"note": f"Script src uses {scheme}: URL"
|
||
})
|
||
return _miss()
|
||
|
||
|
||
def script_src_has_dangerous_extension(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags <script> tags with dangerous file extensions (e.g., .vbs, .hta)."""
|
||
src = facts.get("src") or ""
|
||
if not isinstance(src, str):
|
||
return _miss()
|
||
|
||
low = src.lower()
|
||
dangerous = (".vbs", ".hta")
|
||
i = 0
|
||
m = len(dangerous)
|
||
while i < m:
|
||
ext = dangerous[i]
|
||
if low.endswith(ext):
|
||
return _hit({
|
||
"ext": ext,
|
||
"src": src,
|
||
"note": f"External script has dangerous extension ({ext})"
|
||
})
|
||
i = i + 1
|
||
|
||
return _miss()
|
||
|
||
|
||
def script_third_party_host(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags scripts loaded from a different hostname than the page."""
|
||
base_host = facts.get("base_hostname") or ""
|
||
src_host = facts.get("src_hostname") or ""
|
||
if base_host and src_host and base_host != src_host:
|
||
return _hit({
|
||
"base_host": base_host,
|
||
"src_host": src_host,
|
||
"note": f"Third-party script host: {src_host}"
|
||
})
|
||
return _miss()
|
||
|
||
|
||
# ---------------- Form rules ----------------
|
||
|
||
def form_action_missing(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags <form> elements with no meaningful action attribute."""
|
||
action = (facts.get("action") or "").strip()
|
||
if action in _NOOP_ACTIONS:
|
||
return _hit({
|
||
"action": action,
|
||
"note": "Form has no action attribute (or uses a no-op action)"
|
||
})
|
||
return _miss()
|
||
|
||
|
||
def form_http_on_https_page(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags forms submitting over HTTP while the page was loaded over HTTPS."""
|
||
base_url = (facts.get("base_url") or "").strip()
|
||
action = (facts.get("action") or "").strip()
|
||
|
||
try:
|
||
base_scheme = (urlparse(base_url).scheme or "").lower()
|
||
parsed_act = urlparse(action)
|
||
act_scheme = (parsed_act.scheme or "").lower()
|
||
except Exception:
|
||
return _miss() # parsing trouble → don’t flag
|
||
|
||
# Only flag absolute http:// actions on https pages.
|
||
if base_scheme == "https" and act_scheme == "http":
|
||
return _hit({
|
||
"base_url": base_url,
|
||
"action": parsed_act.geturl(),
|
||
"note": "Submits over insecure HTTP"
|
||
})
|
||
return _miss()
|
||
|
||
|
||
def form_submits_to_different_host(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
||
"""Flags <form> actions that submit to a different hostname than the page."""
|
||
base_host = (facts.get("base_hostname") or "").strip().lower()
|
||
action = (facts.get("action") or "").strip()
|
||
|
||
if not action or action in _NOOP_ACTIONS:
|
||
return _miss()
|
||
|
||
try:
|
||
parsed = urlparse(action)
|
||
act_host = (parsed.hostname or "").lower()
|
||
except Exception:
|
||
return _miss()
|
||
|
||
# Only compare when the action specifies a host (absolute URL or schemeless //host/path).
|
||
if act_host and base_host and act_host != base_host:
|
||
return _hit({
|
||
"base_host": base_host,
|
||
"act_host": act_host,
|
||
"action": action,
|
||
"note": "Submits to a different host"
|
||
})
|
||
return _miss() |