Files
SneakyScope/app/rules/rules_engine.py
Phillip Tarrant 693f7d67b9 feat: HTTPS auto-normalization; robust TLS intel UI; global rules state; clean logging; preload
- Add SSL/TLS intelligence pipeline:
  - crt.sh lookup with expired-filtering and root-domain wildcard resolution
  - live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data

- Introduce `URLNormalizer` with punycode support and typo repair
  - Auto-prepend `https://` for bare domains (e.g., `google.com`)
  - Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
  - `get_url_normalizer()` reads defaults from Settings (if present)

- Standardize function-rule return shape to `(bool, dict|None)` across
  `form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
  - Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
  - Adapt non-dict inputs to facts (category-aware and via provided adapter)
  - Return `(True, dict)` on match, `(False, None)` on miss
  - Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
  - Back rules by private `self._rules`; `rules` property returns copy
  - Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
  - Fix AttributeError from property assignment during `__init__`

- Replace hidden singleton factory with explicit builder + global state:
  - `app/rules/factory.py::build_rules_engine()` builds and logs totals
  - `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
  - `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts

- Unify logging wiring:
  - `wire_logging_once(app)` clears and attaches a single handler chain
  - Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
  - Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)

- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)

- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section

- Consumers should import the rules engine via:
  - `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
  then publish with `set_rules_engine()`. Do not call old singleton factories.

- New/changed modules (high level):
  - `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
  - `app/rules/function_rules.py` (±) — normalized payload returns
  - `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
  - `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
  - `app/rules/factory.py` (±) — pure builder; totals logged post-registration
  - `app/state.py` (+) — process-global rules engine
  - `app/logging_setup.py` (±) — single chain, two named loggers
  - `app/wsgi.py` (±) — preload build + `set_rules_engine()`
  - `entrypoint.sh` (±) — add `--preload`
  - templates (±) — TLS card, raw toggle; front-page checkbox

Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
2025-08-21 22:05:16 -05:00

353 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
rules_engine.py
"""
import re
import logging
from dataclasses import dataclass, asdict, field
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union
from app.logging_setup import get_engine_logger
from app.utils.settings import get_settings
settings = get_settings()
import yaml
try:
# Flask is optional; engine still works without it.
from flask import current_app, has_app_context
except Exception:
current_app = None # type: ignore
def has_app_context() -> bool: # type: ignore
return False
logger = get_engine_logger()
@dataclass
class Rule:
"""
Represents a single detection rule.
When rule_type == 'regex', 'pattern' must be provided.
When rule_type == 'function', 'function' must be provided and return (matched: bool, reason: str).
"""
name: str
description: str
category: str
rule_type: str = "regex"
pattern: Optional[str] = None
function: Optional[Callable[[str], Tuple[bool, str]]] = None
severity: Optional[str] = None # 'low' | 'medium' | 'high' (optional)
tags: Optional[List[str]] = field(default=None) # e.g., ['obfuscation', 'phishing'] (optional)
# Internal compiled regex cache (not serialized)
_compiled_regex: Optional[re.Pattern] = field(default=None, repr=False, compare=False)
def compile_if_needed(self) -> bool:
"""
Compile the regex pattern once for performance, if applicable.
Returns:
bool: True if the regex is compiled and ready, False otherwise.
"""
if self.rule_type == "regex" and self.pattern:
try:
self._compiled_regex = re.compile(self.pattern, re.IGNORECASE)
logger.debug(f"[Rule] Compiled regex for '{self.name}'")
return True
except re.error as rex:
self._compiled_regex = None
logger.warning(f"[Rule] Failed to compile regex for '{self.name}': {rex}")
return False
return False
def run(self, text: str) -> Tuple[bool, str]:
"""
Run the rule on the given text.
Returns:
(matched: bool, reason: str)
"""
if self.rule_type == "regex":
if not self.pattern:
logger.warning(f"[Rule] '{self.name}' missing regex pattern.")
return False, "Invalid rule configuration: missing pattern"
if self._compiled_regex is None:
compiled_ok = self.compile_if_needed()
if not compiled_ok:
return False, f"Invalid regex pattern: {self.pattern!r}"
if self._compiled_regex and self._compiled_regex.search(text):
return True, f"Matched regex '{self.pattern}'{self.description}"
return False, "No match"
if self.rule_type == "function":
if callable(self.function):
try:
matched, reason = self.function(text)
if isinstance(matched, bool) and isinstance(reason, str):
return matched, reason
logger.warning(f"[Rule] '{self.name}' function returned invalid types.")
return False, "Invalid function return type; expected (bool, str)"
except Exception as exc:
logger.exception(f"[Rule] '{self.name}' function raised exception.")
return False, f"Rule function raised exception: {exc!r}"
logger.warning(f"[Rule] '{self.name}' has invalid function configuration.")
return False, "Invalid rule configuration: function not callable"
logger.warning(f"[Rule] '{self.name}' has unknown type '{self.rule_type}'.")
return False, f"Invalid rule configuration: unknown type '{self.rule_type}'"
@dataclass
class RuleResult:
"""
Uniform per-rule outcome for UI/API consumption.
result is "PASS" or "FAIL" (FAIL == matched True)
"""
name: str
description: str
category: str
result: str # "PASS" | "FAIL"
reason: Optional[str] = None
severity: Optional[str] = None
tags: Optional[List[str]] = None
class RuleEngine:
"""
Loads and executes rules against provided text, with Flask-aware logging.
"""
def __init__(self, rules: Optional[List[Rule]] = None):
"""
Args:
rules: Optional initial rule list.
"""
# IMPORTANT: back the property with a private list
self._rules = [] # was: self.rules = []
self._rule_keys = set()
self._rule_index = {}
# If a list of rules was provided, add them via add_rule so compilation happens
if rules:
i = 0
n = len(rules)
while i < n:
self.add_rule(rules[i]) # compiles regex as needed
i = i + 1
def add_rule(self, rule: Rule, replace: bool = False) -> None:
"""
Add a new rule at runtime; compiles regex if needed and logs failures.
Idempotent by (category, name):
- If the same (category, name) is already present:
* replace=False (default): ignore duplicate and warn.
* replace=True: replace the existing rule in place and recompile regex.
Args:
rule: Rule to add.
replace: If True, overwrite an existing rule with the same (category, name).
"""
# Ensure tracking structures exist in case __init__ wasnt updated somewhere
if not hasattr(self, "_rule_keys"):
self._rule_keys = set()
if not hasattr(self, "_rule_index"):
self._rule_index = {}
i = 0
length = len(getattr(self, "_rules", []))
while i < length:
existing = self._rules[i]
key_i = (existing.category, existing.name)
self._rule_keys.add(key_i)
self._rule_index[key_i] = i
i = i + 1
key = (rule.category, rule.name)
if key in self._rule_keys:
if not replace:
try:
logger.warning("[Rules] Duplicate registration ignored: %s/%s", rule.category, rule.name)
except Exception:
pass
return
# Replace existing rule in place
idx = self._rule_index.get(key)
if idx is None:
idx = len(self._rules)
self._rules.append(rule)
self._rule_index[key] = idx
else:
self._rules[idx] = rule
if rule.rule_type == "regex":
compiled_ok = rule.compile_if_needed()
if not compiled_ok:
logger.warning(
"[Engine] Regex failed when replacing rule '%s' (pattern=%r)",
rule.name, getattr(rule, "pattern", None)
)
return
if settings.app.print_rule_loads:
logger.info(
"[engine] add_rule: %s/%s replace=%s -> count=%d",
rule.category, rule.name, bool(replace), len(self._rules)
)
# New rule path
self._rules.append(rule)
self._rule_keys.add(key)
self._rule_index[key] = len(self._rules) - 1
if rule.rule_type == "regex":
compiled_ok = rule.compile_if_needed()
if not compiled_ok:
logger.warning(
"[Engine] Regex failed when adding rule '%s' (pattern=%r)",
rule.name, getattr(rule, "pattern", None)
)
# helper, not used ATM
def add_rules(self, rules: list[Rule], replace: bool = False) -> None:
"""
Add many rules safely (idempotent). Uses the same semantics as add_rule.
"""
i = 0
n = len(rules)
while i < n:
self.add_rule(rules[i], replace=replace)
i = i + 1
def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]:
"""
Run all rules against text.
Args:
text: The content to test.
category: If provided, only evaluate rules that match this category.
Returns:
List of dicts with PASS/FAIL per rule (JSON-serializable).
"""
results: List[Dict] = []
index = 0
total = len(self.rules)
while index < total:
rule = self.rules[index]
if category is not None and rule.category != category:
index = index + 1
continue
matched, reason = rule.run(text)
result_str = "FAIL" if matched else "PASS"
reason_to_include: Optional[str]
if matched:
reason_to_include = reason
else:
reason_to_include = None
rr = RuleResult(
name=rule.name,
description=rule.description,
category=rule.category,
result=result_str,
reason=reason_to_include,
severity=rule.severity,
tags=rule.tags,
)
results.append(asdict(rr))
index = index + 1
logger.debug(f"[Engine] Completed evaluation. Returned {len(results)} rule results.")
return results
@property
def rules(self) -> List[Rule]:
"""Read-only view (returns a shallow copy) of registered rules."""
return list(self._rules)
def load_rules_from_yaml(yaml_file: Union[str, Path]) -> List[Rule]:
"""
Load rules from a YAML file.
Supports optional 'severity' and 'tags' keys.
Example YAML:
- name: suspicious_eval
description: "Use of eval() in script"
category: script
type: regex
pattern: "\\beval\\("
severity: medium
tags: [obfuscation]
Returns:
List[Rule]
"""
rules: List[Rule] = []
path = Path(yaml_file)
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, list):
logger.error("[Loader] Rules YAML must be a list of rule objects.")
raise ValueError("Rules YAML must be a list of rule objects.")
idx = 0
total = len(data)
while idx < total:
item = data[idx]
name = item.get("name")
description = item.get("description")
category = item.get("category")
rule_type = item.get("type", "regex")
pattern = item.get("pattern")
severity = item.get("severity")
tags = item.get("tags")
if not name or not description or not category:
logger.warning(f"[Loader] Skipping invalid rule at index {idx}: missing required fields.")
idx = idx + 1
continue
rule = Rule(
name=name,
description=description,
category=category,
rule_type=rule_type,
pattern=pattern,
function=None, # function rules should be registered in code
severity=severity,
tags=tags if isinstance(tags, list) else None,
)
rules.append(rule)
idx = idx + 1
return rules