Files
SneakyScope/app/rules/rules_engine.py
Phillip Tarrant 55cd81aec0 feat(text): add text analysis pipeline & surface results in UI
- engine: add analyse_text() to extract visible page text and evaluate
  category="text" rules; collect matched phrases and expose as
  `content_snippet` (deduped, length-capped via settings.ui.snippet_preview_len).
- engine: removed unused code
- browser: removed double call for enrichment
- engine: improve regex compilation — honor per-rule flags (string or list)
  and default IGNORECASE when category=="text".
- engine: add dispatch logging "[engine] applying categories: …" gated by
  settings.app.print_rule_dispatch.
- ui(templates): add `templates/partials/result_text.html` mirroring the forms
  table; renders page-level records and their matched rules.
- ui(controller): wire `analyse_text()` into scan path and expose
  `payload["suspicious_text"]`.
- rules(text): add `identity_verification_prompt`, `gated_document_access`,
  `email_collection_prompt`; broaden `credential_reset`.

fix: text indicators were not displayed due to missing analyzer and mismatched result shape.

Result shape:
  suspicious_text: [
    {
      "type": "page",
      "content_snippet": "...matched phrases…",
      "rules": [
        {"name": "...", "description": "...", "severity": "medium", "tags": ["..."]}
      ]
    }
  ]
2025-08-22 17:18:50 -05:00

450 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
rules_engine.py
"""
import re
import unicodedata
from collections import Counter
from dataclasses import dataclass, asdict, field
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union
from app.logging_setup import get_engine_logger
from app.utils.settings import get_settings
import re as _re
FLAG_MAP = {
"i": _re.IGNORECASE, "ignorecase": _re.IGNORECASE,
"m": _re.MULTILINE, "multiline": _re.MULTILINE,
"s": _re.DOTALL, "dotall": _re.DOTALL, "singleline": _re.DOTALL,
"x": _re.VERBOSE, "verbose": _re.VERBOSE,
"a": _re.ASCII, "ascii": _re.ASCII,
"u": _re.UNICODE, "unicode": _re.UNICODE,
"l": _re.LOCALE, "locale": _re.LOCALE,
}
settings = get_settings()
import yaml
try:
# Flask is optional; engine still works without it.
from flask import current_app, has_app_context
except Exception:
current_app = None # type: ignore
def has_app_context() -> bool: # type: ignore
return False
logger = get_engine_logger()
@dataclass
class Rule:
"""
Represents a single detection rule.
When rule_type == 'regex', 'pattern' must be provided.
When rule_type == 'function', 'function' must be provided and return (matched: bool, reason: str).
"""
name: str
description: str
category: str
rule_type: str = "regex"
pattern: Optional[str] = None
function: Optional[Callable[[str], Tuple[bool, str]]] = None
severity: Optional[str] = None # 'low' | 'medium' | 'high' (optional)
tags: Optional[List[str]] = field(default=None) # e.g., ['obfuscation', 'phishing'] (optional)
# Internal compiled regex cache (not serialized)
_compiled_regex: Optional[re.Pattern] = field(default=None, repr=False, compare=False)
def compile_if_needed(self) -> bool:
"""
Compile the regex pattern once for performance, if applicable.
Behavior:
- Uses flags specified on the rule (list like ['i','m'] or a string like 'im').
- If the rule category is 'text' and no 'i' flag is set, defaults to IGNORECASE.
- Stores the compiled object on self._compiled_regex.
Returns:
bool: True if the regex is compiled and ready, False otherwise.
"""
if getattr(self, "rule_type", None) != "regex" or not getattr(self, "pattern", None):
return False
re_flags = 0
# Collect flags from the rule, if any (supports "ims" or ["i","m","s"])
raw_flags = getattr(self, "flags", None)
if isinstance(raw_flags, str):
for ch in raw_flags:
mapped = FLAG_MAP.get(ch.lower())
if mapped is not None:
re_flags |= mapped
else:
logger.warning("[Rule] Unknown regex flag %r on rule '%s'", ch, getattr(self, "name", "?"))
elif isinstance(raw_flags, (list, tuple, set)):
for fl in raw_flags:
key = str(fl).lower()
mapped = FLAG_MAP.get(key)
if mapped is not None:
re_flags |= mapped
else:
logger.warning("[Rule] Unknown regex flag %r on rule '%s'", fl, getattr(self, "name", "?"))
# Default IGNORECASE for text rules if not explicitly provided
cat = (getattr(self, "category", "") or "").lower().strip()
if cat == "text" and not (re_flags & _re.IGNORECASE):
re_flags |= _re.IGNORECASE
try:
self._compiled_regex = _re.compile(self.pattern, re_flags)
# Build a compact flag summary inline (e.g., 'ims' or '-' if none)
flag_parts = []
if re_flags & _re.IGNORECASE: flag_parts.append("i")
if re_flags & _re.MULTILINE: flag_parts.append("m")
if re_flags & _re.DOTALL: flag_parts.append("s")
if re_flags & _re.VERBOSE: flag_parts.append("x")
if re_flags & _re.ASCII: flag_parts.append("a")
if re_flags & _re.UNICODE: flag_parts.append("u")
if re_flags & _re.LOCALE: flag_parts.append("l")
flag_summary = "".join(flag_parts) if flag_parts else "-"
logger.info("[Rule] Compiled regex for '%s' (flags=%s)", getattr(self, "name", "?"), flag_summary)
return True
except _re.error as rex:
self._compiled_regex = None
logger.warning("[Rule] Failed to compile regex for '%s': %s", getattr(self, "name", "?"), rex)
return False
def run(self, text: str) -> Tuple[bool, str]:
"""
Run the rule on the given text.
Returns:
(matched: bool, reason: str)
"""
if self.rule_type == "regex":
if not self.pattern:
logger.warning(f"[Rule] '{self.name}' missing regex pattern.")
return False, "Invalid rule configuration: missing pattern"
if self._compiled_regex is None:
compiled_ok = self.compile_if_needed()
if not compiled_ok:
return False, f"Invalid regex pattern: {self.pattern!r}"
if self._compiled_regex and self._compiled_regex.search(text):
return True, f"Matched regex '{self.pattern}'{self.description}"
return False, "No match"
if self.rule_type == "function":
if callable(self.function):
try:
matched, reason = self.function(text)
if isinstance(matched, bool) and isinstance(reason, str):
return matched, reason
logger.warning(f"[Rule] '{self.name}' function returned invalid types.")
return False, "Invalid function return type; expected (bool, str)"
except Exception as exc:
logger.exception(f"[Rule] '{self.name}' function raised exception.")
return False, f"Rule function raised exception: {exc!r}"
logger.warning(f"[Rule] '{self.name}' has invalid function configuration.")
return False, "Invalid rule configuration: function not callable"
logger.warning(f"[Rule] '{self.name}' has unknown type '{self.rule_type}'.")
return False, f"Invalid rule configuration: unknown type '{self.rule_type}'"
@dataclass
class RuleResult:
"""
Uniform per-rule outcome for UI/API consumption.
result is "PASS" or "FAIL" (FAIL == matched True)
"""
name: str
description: str
category: str
result: str # "PASS" | "FAIL"
reason: Optional[str] = None
severity: Optional[str] = None
tags: Optional[List[str]] = None
class RuleEngine:
"""
Loads and executes rules against provided text, with Flask-aware logging.
"""
def __init__(self, rules: Optional[List[Rule]] = None):
"""
Args:
rules: Optional initial rule list.
"""
# IMPORTANT: back the property with a private list
self._rules = [] # was: self.rules = []
self._rule_keys = set()
self._rule_index = {}
# If a list of rules was provided, add them via add_rule so compilation happens
if rules:
i = 0
n = len(rules)
while i < n:
self.add_rule(rules[i]) # compiles regex as needed
i = i + 1
def add_rule(self, rule: Rule, replace: bool = False) -> None:
"""
Add a new rule at runtime; compiles regex if needed and logs failures.
Idempotent by (category, name):
- If the same (category, name) is already present:
* replace=False (default): ignore duplicate and warn.
* replace=True: replace the existing rule in place and recompile regex.
Args:
rule: Rule to add.
replace: If True, overwrite an existing rule with the same (category, name).
"""
# Ensure tracking structures exist in case __init__ wasnt updated somewhere
if not hasattr(self, "_rule_keys"):
self._rule_keys = set()
if not hasattr(self, "_rule_index"):
self._rule_index = {}
i = 0
length = len(getattr(self, "_rules", []))
while i < length:
existing = self._rules[i]
key_i = (existing.category, existing.name)
self._rule_keys.add(key_i)
self._rule_index[key_i] = i
i = i + 1
key = (rule.category, rule.name)
if key in self._rule_keys:
if not replace:
try:
logger.warning("[Rules] Duplicate registration ignored: %s/%s", rule.category, rule.name)
except Exception:
pass
return
# Replace existing rule in place
idx = self._rule_index.get(key)
if idx is None:
idx = len(self._rules)
self._rules.append(rule)
self._rule_index[key] = idx
else:
self._rules[idx] = rule
if rule.rule_type == "regex":
compiled_ok = rule.compile_if_needed()
if not compiled_ok:
logger.warning(
"[Engine] Regex failed when replacing rule '%s' (pattern=%r)",
rule.name, getattr(rule, "pattern", None)
)
return
if settings.app.log_rule_loads:
logger.info(
"[engine] add_rule: %s/%s replace=%s -> count=%d",
rule.category, rule.name, bool(replace), len(self._rules)
)
# New rule path
self._rules.append(rule)
self._rule_keys.add(key)
self._rule_index[key] = len(self._rules) - 1
if rule.rule_type == "regex":
compiled_ok = rule.compile_if_needed()
if not compiled_ok:
logger.warning(
"[Engine] Regex failed when adding rule '%s' (pattern=%r)",
rule.name, getattr(rule, "pattern", None)
)
# helper, not used ATM
def add_rules(self, rules: list[Rule], replace: bool = False) -> None:
"""
Add many rules safely (idempotent). Uses the same semantics as add_rule.
"""
i = 0
n = len(rules)
while i < n:
self.add_rule(rules[i], replace=replace)
i = i + 1
def _normalize_for_text_rules(self, s: str) -> str:
if not s:
return ""
s = unicodedata.normalize("NFKC", s)
# collapse whitespace; keeps word boundaries sensible
s = _re.sub(r"\s+", " ", s).strip()
return s
def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]:
"""
Run all rules against text.
Args:
text: The content to test.
category: If provided, only evaluate rules that match this category.
Returns:
List of dicts with PASS/FAIL per rule (JSON-serializable).
"""
# --- dispatch visibility --- if set to true, we log applied categories
if getattr(settings.app, "log_rule_dispatch", False):
all_cats = [r.category for r in self._rules]
cat_counts = Counter(all_cats)
# Which categories are being applied this run?
if category is None:
selected_categories = sorted(cat_counts.keys())
else:
selected_categories = [category]
# How many rules match the selection?
selected_rule_count = sum(1 for r in self._rules if r.category in selected_categories)
try:
logger.info(
"[engine] applying categories: %s | selected_rules=%d | totals=%s",
",".join(selected_categories),
selected_rule_count,
dict(cat_counts),
)
except Exception:
pass
# --- end dispatch visibility ---
results: List[Dict] = []
index = 0
total = len(self.rules)
while index < total:
rule = self.rules[index]
# if we are running a text rule, let's normalize the text.
if category == "text":
text = self._normalize_for_text_rules(text)
if category is not None and rule.category != category:
index = index + 1
continue
matched, reason = rule.run(text)
# very fine-grained trace per rule:
if getattr(settings.app, "log_rule_debug", False):
logger.info(f"[engine] eval: cat:{rule.category} - rule:{rule.name} - result: {matched} - reason:{reason}" )
result_str = "FAIL" if matched else "PASS"
reason_to_include: Optional[str]
if matched:
reason_to_include = reason
else:
reason_to_include = None
rr = RuleResult(
name=rule.name,
description=rule.description,
category=rule.category,
result=result_str,
reason=reason_to_include,
severity=rule.severity,
tags=rule.tags,
)
results.append(asdict(rr))
index = index + 1
logger.debug(f"[Engine] Completed evaluation. Returned {len(results)} rule results.")
return results
@property
def rules(self) -> List[Rule]:
"""Read-only view (returns a shallow copy) of registered rules."""
return list(self._rules)
def load_rules_from_yaml(yaml_file: Union[str, Path]) -> List[Rule]:
"""
Load rules from a YAML file.
Supports optional 'severity' and 'tags' keys.
Example YAML:
- name: suspicious_eval
description: "Use of eval() in script"
category: script
type: regex
pattern: "\\beval\\("
severity: medium
tags: [obfuscation]
Returns:
List[Rule]
"""
rules: List[Rule] = []
path = Path(yaml_file)
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, list):
logger.error("[Loader] Rules YAML must be a list of rule objects.")
raise ValueError("Rules YAML must be a list of rule objects.")
idx = 0
total = len(data)
while idx < total:
item = data[idx]
name = item.get("name")
description = item.get("description")
category = item.get("category")
rule_type = item.get("type", "regex")
pattern = item.get("pattern")
severity = item.get("severity")
tags = item.get("tags")
if not name or not description or not category:
logger.warning(f"[Loader] Skipping invalid rule at index {idx}: missing required fields.")
idx = idx + 1
continue
rule = Rule(
name=name,
description=description,
category=category,
rule_type=rule_type,
pattern=pattern,
function=None, # function rules should be registered in code
severity=severity,
tags=tags if isinstance(tags, list) else None,
)
rules.append(rule)
idx = idx + 1
return rules