feat(text): add text analysis pipeline & surface results in UI
- engine: add analyse_text() to extract visible page text and evaluate
category="text" rules; collect matched phrases and expose as
`content_snippet` (deduped, length-capped via settings.ui.snippet_preview_len).
- engine: removed unused code
- browser: removed double call for enrichment
- engine: improve regex compilation — honor per-rule flags (string or list)
and default IGNORECASE when category=="text".
- engine: add dispatch logging "[engine] applying categories: …" gated by
settings.app.print_rule_dispatch.
- ui(templates): add `templates/partials/result_text.html` mirroring the forms
table; renders page-level records and their matched rules.
- ui(controller): wire `analyse_text()` into scan path and expose
`payload["suspicious_text"]`.
- rules(text): add `identity_verification_prompt`, `gated_document_access`,
`email_collection_prompt`; broaden `credential_reset`.
fix: text indicators were not displayed due to missing analyzer and mismatched result shape.
Result shape:
suspicious_text: [
{
"type": "page",
"content_snippet": "...matched phrases…",
"rules": [
{"name": "...", "description": "...", "severity": "medium", "tags": ["..."]}
]
}
]
This commit is contained in:
@@ -3,7 +3,8 @@ rules_engine.py
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
import unicodedata
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||
@@ -11,6 +12,18 @@ from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||
from app.logging_setup import get_engine_logger
|
||||
from app.utils.settings import get_settings
|
||||
|
||||
import re as _re
|
||||
|
||||
FLAG_MAP = {
|
||||
"i": _re.IGNORECASE, "ignorecase": _re.IGNORECASE,
|
||||
"m": _re.MULTILINE, "multiline": _re.MULTILINE,
|
||||
"s": _re.DOTALL, "dotall": _re.DOTALL, "singleline": _re.DOTALL,
|
||||
"x": _re.VERBOSE, "verbose": _re.VERBOSE,
|
||||
"a": _re.ASCII, "ascii": _re.ASCII,
|
||||
"u": _re.UNICODE, "unicode": _re.UNICODE,
|
||||
"l": _re.LOCALE, "locale": _re.LOCALE,
|
||||
}
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
import yaml
|
||||
@@ -49,20 +62,64 @@ class Rule:
|
||||
"""
|
||||
Compile the regex pattern once for performance, if applicable.
|
||||
|
||||
Behavior:
|
||||
- Uses flags specified on the rule (list like ['i','m'] or a string like 'im').
|
||||
- If the rule category is 'text' and no 'i' flag is set, defaults to IGNORECASE.
|
||||
- Stores the compiled object on self._compiled_regex.
|
||||
|
||||
Returns:
|
||||
bool: True if the regex is compiled and ready, False otherwise.
|
||||
"""
|
||||
if getattr(self, "rule_type", None) != "regex" or not getattr(self, "pattern", None):
|
||||
return False
|
||||
|
||||
re_flags = 0
|
||||
|
||||
# Collect flags from the rule, if any (supports "ims" or ["i","m","s"])
|
||||
raw_flags = getattr(self, "flags", None)
|
||||
if isinstance(raw_flags, str):
|
||||
for ch in raw_flags:
|
||||
mapped = FLAG_MAP.get(ch.lower())
|
||||
if mapped is not None:
|
||||
re_flags |= mapped
|
||||
else:
|
||||
logger.warning("[Rule] Unknown regex flag %r on rule '%s'", ch, getattr(self, "name", "?"))
|
||||
elif isinstance(raw_flags, (list, tuple, set)):
|
||||
for fl in raw_flags:
|
||||
key = str(fl).lower()
|
||||
mapped = FLAG_MAP.get(key)
|
||||
if mapped is not None:
|
||||
re_flags |= mapped
|
||||
else:
|
||||
logger.warning("[Rule] Unknown regex flag %r on rule '%s'", fl, getattr(self, "name", "?"))
|
||||
|
||||
# Default IGNORECASE for text rules if not explicitly provided
|
||||
cat = (getattr(self, "category", "") or "").lower().strip()
|
||||
if cat == "text" and not (re_flags & _re.IGNORECASE):
|
||||
re_flags |= _re.IGNORECASE
|
||||
|
||||
try:
|
||||
self._compiled_regex = _re.compile(self.pattern, re_flags)
|
||||
|
||||
# Build a compact flag summary inline (e.g., 'ims' or '-' if none)
|
||||
flag_parts = []
|
||||
if re_flags & _re.IGNORECASE: flag_parts.append("i")
|
||||
if re_flags & _re.MULTILINE: flag_parts.append("m")
|
||||
if re_flags & _re.DOTALL: flag_parts.append("s")
|
||||
if re_flags & _re.VERBOSE: flag_parts.append("x")
|
||||
if re_flags & _re.ASCII: flag_parts.append("a")
|
||||
if re_flags & _re.UNICODE: flag_parts.append("u")
|
||||
if re_flags & _re.LOCALE: flag_parts.append("l")
|
||||
flag_summary = "".join(flag_parts) if flag_parts else "-"
|
||||
|
||||
logger.info("[Rule] Compiled regex for '%s' (flags=%s)", getattr(self, "name", "?"), flag_summary)
|
||||
return True
|
||||
|
||||
except _re.error as rex:
|
||||
self._compiled_regex = None
|
||||
logger.warning("[Rule] Failed to compile regex for '%s': %s", getattr(self, "name", "?"), rex)
|
||||
return False
|
||||
|
||||
if self.rule_type == "regex" and self.pattern:
|
||||
try:
|
||||
self._compiled_regex = re.compile(self.pattern, re.IGNORECASE)
|
||||
logger.debug(f"[Rule] Compiled regex for '{self.name}'")
|
||||
return True
|
||||
except re.error as rex:
|
||||
self._compiled_regex = None
|
||||
logger.warning(f"[Rule] Failed to compile regex for '{self.name}': {rex}")
|
||||
return False
|
||||
return False
|
||||
|
||||
def run(self, text: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
@@ -198,7 +255,7 @@ class RuleEngine:
|
||||
)
|
||||
return
|
||||
|
||||
if settings.app.print_rule_loads:
|
||||
if settings.app.log_rule_loads:
|
||||
logger.info(
|
||||
"[engine] add_rule: %s/%s replace=%s -> count=%d",
|
||||
rule.category, rule.name, bool(replace), len(self._rules)
|
||||
@@ -230,6 +287,14 @@ class RuleEngine:
|
||||
self.add_rule(rules[i], replace=replace)
|
||||
i = i + 1
|
||||
|
||||
def _normalize_for_text_rules(self, s: str) -> str:
|
||||
if not s:
|
||||
return ""
|
||||
s = unicodedata.normalize("NFKC", s)
|
||||
# collapse whitespace; keeps word boundaries sensible
|
||||
s = _re.sub(r"\s+", " ", s).strip()
|
||||
return s
|
||||
|
||||
def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]:
|
||||
"""
|
||||
Run all rules against text.
|
||||
@@ -241,6 +306,30 @@ class RuleEngine:
|
||||
Returns:
|
||||
List of dicts with PASS/FAIL per rule (JSON-serializable).
|
||||
"""
|
||||
|
||||
# --- dispatch visibility --- if set to true, we log applied categories
|
||||
if getattr(settings.app, "log_rule_dispatch", False):
|
||||
all_cats = [r.category for r in self._rules]
|
||||
cat_counts = Counter(all_cats)
|
||||
# Which categories are being applied this run?
|
||||
if category is None:
|
||||
selected_categories = sorted(cat_counts.keys())
|
||||
else:
|
||||
selected_categories = [category]
|
||||
|
||||
# How many rules match the selection?
|
||||
selected_rule_count = sum(1 for r in self._rules if r.category in selected_categories)
|
||||
try:
|
||||
logger.info(
|
||||
"[engine] applying categories: %s | selected_rules=%d | totals=%s",
|
||||
",".join(selected_categories),
|
||||
selected_rule_count,
|
||||
dict(cat_counts),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
# --- end dispatch visibility ---
|
||||
|
||||
results: List[Dict] = []
|
||||
|
||||
index = 0
|
||||
@@ -248,12 +337,20 @@ class RuleEngine:
|
||||
while index < total:
|
||||
rule = self.rules[index]
|
||||
|
||||
# if we are running a text rule, let's normalize the text.
|
||||
if category == "text":
|
||||
text = self._normalize_for_text_rules(text)
|
||||
|
||||
if category is not None and rule.category != category:
|
||||
index = index + 1
|
||||
continue
|
||||
|
||||
matched, reason = rule.run(text)
|
||||
|
||||
# very fine-grained trace per rule:
|
||||
if getattr(settings.app, "log_rule_debug", False):
|
||||
logger.info(f"[engine] eval: cat:{rule.category} - rule:{rule.name} - result: {matched} - reason:{reason}" )
|
||||
|
||||
result_str = "FAIL" if matched else "PASS"
|
||||
reason_to_include: Optional[str]
|
||||
if matched:
|
||||
|
||||
Reference in New Issue
Block a user