""" rules_engine.py """ import re import unicodedata from collections import Counter from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union from app.logging_setup import get_engine_logger from app.utils.settings import get_settings import re as _re FLAG_MAP = { "i": _re.IGNORECASE, "ignorecase": _re.IGNORECASE, "m": _re.MULTILINE, "multiline": _re.MULTILINE, "s": _re.DOTALL, "dotall": _re.DOTALL, "singleline": _re.DOTALL, "x": _re.VERBOSE, "verbose": _re.VERBOSE, "a": _re.ASCII, "ascii": _re.ASCII, "u": _re.UNICODE, "unicode": _re.UNICODE, "l": _re.LOCALE, "locale": _re.LOCALE, } settings = get_settings() import yaml try: # Flask is optional; engine still works without it. from flask import current_app, has_app_context except Exception: current_app = None # type: ignore def has_app_context() -> bool: # type: ignore return False logger = get_engine_logger() @dataclass class Rule: """ Represents a single detection rule. When rule_type == 'regex', 'pattern' must be provided. When rule_type == 'function', 'function' must be provided and return (matched: bool, reason: str). """ name: str description: str category: str rule_type: str = "regex" pattern: Optional[str] = None function: Optional[Callable[[str], Tuple[bool, str]]] = None severity: Optional[str] = None # 'low' | 'medium' | 'high' (optional) tags: Optional[List[str]] = field(default=None) # e.g., ['obfuscation', 'phishing'] (optional) # Internal compiled regex cache (not serialized) _compiled_regex: Optional[re.Pattern] = field(default=None, repr=False, compare=False) def compile_if_needed(self) -> bool: """ Compile the regex pattern once for performance, if applicable. Behavior: - Uses flags specified on the rule (list like ['i','m'] or a string like 'im'). - If the rule category is 'text' and no 'i' flag is set, defaults to IGNORECASE. - Stores the compiled object on self._compiled_regex. Returns: bool: True if the regex is compiled and ready, False otherwise. """ if getattr(self, "rule_type", None) != "regex" or not getattr(self, "pattern", None): return False re_flags = 0 # Collect flags from the rule, if any (supports "ims" or ["i","m","s"]) raw_flags = getattr(self, "flags", None) if isinstance(raw_flags, str): for ch in raw_flags: mapped = FLAG_MAP.get(ch.lower()) if mapped is not None: re_flags |= mapped else: logger.warning("[Rule] Unknown regex flag %r on rule '%s'", ch, getattr(self, "name", "?")) elif isinstance(raw_flags, (list, tuple, set)): for fl in raw_flags: key = str(fl).lower() mapped = FLAG_MAP.get(key) if mapped is not None: re_flags |= mapped else: logger.warning("[Rule] Unknown regex flag %r on rule '%s'", fl, getattr(self, "name", "?")) # Default IGNORECASE for text rules if not explicitly provided cat = (getattr(self, "category", "") or "").lower().strip() if cat == "text" and not (re_flags & _re.IGNORECASE): re_flags |= _re.IGNORECASE try: self._compiled_regex = _re.compile(self.pattern, re_flags) # Build a compact flag summary inline (e.g., 'ims' or '-' if none) flag_parts = [] if re_flags & _re.IGNORECASE: flag_parts.append("i") if re_flags & _re.MULTILINE: flag_parts.append("m") if re_flags & _re.DOTALL: flag_parts.append("s") if re_flags & _re.VERBOSE: flag_parts.append("x") if re_flags & _re.ASCII: flag_parts.append("a") if re_flags & _re.UNICODE: flag_parts.append("u") if re_flags & _re.LOCALE: flag_parts.append("l") flag_summary = "".join(flag_parts) if flag_parts else "-" logger.info("[Rule] Compiled regex for '%s' (flags=%s)", getattr(self, "name", "?"), flag_summary) return True except _re.error as rex: self._compiled_regex = None logger.warning("[Rule] Failed to compile regex for '%s': %s", getattr(self, "name", "?"), rex) return False def run(self, text: str) -> Tuple[bool, str]: """ Run the rule on the given text. Returns: (matched: bool, reason: str) """ if self.rule_type == "regex": if not self.pattern: logger.warning(f"[Rule] '{self.name}' missing regex pattern.") return False, "Invalid rule configuration: missing pattern" if self._compiled_regex is None: compiled_ok = self.compile_if_needed() if not compiled_ok: return False, f"Invalid regex pattern: {self.pattern!r}" if self._compiled_regex and self._compiled_regex.search(text): return True, f"Matched regex '{self.pattern}' → {self.description}" return False, "No match" if self.rule_type == "function": if not callable(self.function): logger.warning( "[Rule] '%s' function is not callable (type=%s, value=%r)", self.name, type(self.function).__name__, self.function ) return False, "Invalid rule configuration: function not callable" if callable(self.function): try: matched, reason = self.function(text) if isinstance(matched, bool) and isinstance(reason, str): return matched, reason logger.warning(f"[Rule] '{self.name}' function returned invalid types.") return False, "Invalid function return type; expected (bool, str)" except Exception as exc: logger.exception(f"[Rule] '{self.name}' function raised exception.") return False, f"Rule function raised exception: {exc!r}" logger.warning(f"[Rule] '{self.name}' has invalid function configuration.") return False, "Invalid rule configuration: function not callable" logger.warning(f"[Rule] '{self.name}' has unknown type '{self.rule_type}'.") return False, f"Invalid rule configuration: unknown type '{self.rule_type}'" @dataclass class RuleResult: """ Uniform per-rule outcome for UI/API consumption. result is "PASS" or "FAIL" (FAIL == matched True) """ name: str description: str category: str result: str # "PASS" | "FAIL" reason: Optional[str] = None severity: Optional[str] = None tags: Optional[List[str]] = None class RuleEngine: """ Loads and executes rules against provided text, with Flask-aware logging. """ def __init__(self, rules: Optional[List[Rule]] = None): """ Args: rules: Optional initial rule list. """ # IMPORTANT: back the property with a private list self._rules = [] # was: self.rules = [] self._rule_keys = set() self._rule_index = {} # If a list of rules was provided, add them via add_rule so compilation happens if rules: i = 0 n = len(rules) while i < n: self.add_rule(rules[i]) # compiles regex as needed i = i + 1 def add_rule(self, rule: Rule, replace: bool = False) -> None: """ Add a new rule at runtime; compiles regex if needed and logs failures. Idempotent by (category, name): - If the same (category, name) is already present: * replace=False (default): ignore duplicate and warn. * replace=True: replace the existing rule in place and recompile regex. Args: rule: Rule to add. replace: If True, overwrite an existing rule with the same (category, name). """ # Ensure tracking structures exist in case __init__ wasn’t updated somewhere if not hasattr(self, "_rule_keys"): self._rule_keys = set() if not hasattr(self, "_rule_index"): self._rule_index = {} i = 0 length = len(getattr(self, "_rules", [])) while i < length: existing = self._rules[i] key_i = (existing.category, existing.name) self._rule_keys.add(key_i) self._rule_index[key_i] = i i = i + 1 key = (rule.category, rule.name) if key in self._rule_keys: if not replace: try: logger.warning("[Rules] Duplicate registration ignored: %s/%s", rule.category, rule.name) except Exception: pass return # Replace existing rule in place idx = self._rule_index.get(key) if idx is None: idx = len(self._rules) self._rules.append(rule) self._rule_index[key] = idx else: self._rules[idx] = rule if rule.rule_type == "regex": compiled_ok = rule.compile_if_needed() if not compiled_ok: logger.warning( "[Engine] Regex failed when replacing rule '%s' (pattern=%r)", rule.name, getattr(rule, "pattern", None) ) return if settings.logconfig.log_rule_loads: logger.info( "[engine] add_rule: %s/%s replace=%s -> count=%d", rule.category, rule.name, bool(replace), len(self._rules) ) # New rule path self._rules.append(rule) self._rule_keys.add(key) self._rule_index[key] = len(self._rules) - 1 if rule.rule_type == "regex": compiled_ok = rule.compile_if_needed() if not compiled_ok: logger.warning( "[Engine] Regex failed when adding rule '%s' (pattern=%r)", rule.name, getattr(rule, "pattern", None) ) # helper, not used ATM def add_rules(self, rules: list[Rule], replace: bool = False) -> None: """ Add many rules safely (idempotent). Uses the same semantics as add_rule. """ i = 0 n = len(rules) while i < n: self.add_rule(rules[i], replace=replace) i = i + 1 def _normalize_for_text_rules(self, s: str) -> str: if not s: return "" s = unicodedata.normalize("NFKC", s) # collapse whitespace; keeps word boundaries sensible s = _re.sub(r"\s+", " ", s).strip() return s def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]: """ Run all rules against text. Args: text: The content to test. category: If provided, only evaluate rules that match this category. Returns: List of dicts with PASS/FAIL per rule (JSON-serializable). """ # --- dispatch visibility --- if set to true, we log applied categories if getattr(settings.logconfig, "log_rule_dispatch", False): all_cats = [r.category for r in self._rules] cat_counts = Counter(all_cats) # Which categories are being applied this run? if category is None: selected_categories = sorted(cat_counts.keys()) else: selected_categories = [category] # How many rules match the selection? selected_rule_count = sum(1 for r in self._rules if r.category in selected_categories) try: logger.info( "[engine] applying categories: %s | selected_rules=%d | totals=%s", ",".join(selected_categories), selected_rule_count, dict(cat_counts), ) except Exception: pass # --- end dispatch visibility --- results: List[Dict] = [] index = 0 total = len(self.rules) while index < total: rule = self.rules[index] # if we are running a text rule, let's normalize the text. if category == "text": text = self._normalize_for_text_rules(text) if category is not None and rule.category != category: index = index + 1 continue matched, reason = rule.run(text) # very fine-grained trace per rule: if getattr(settings.app, "log_rule_debug", False): logger.info(f"[engine] eval: cat:{rule.category} - rule:{rule.name} - result: {matched} - reason:{reason}" ) result_str = "FAIL" if matched else "PASS" reason_to_include: Optional[str] if matched: reason_to_include = reason else: reason_to_include = None rr = RuleResult( name=rule.name, description=rule.description, category=rule.category, result=result_str, reason=reason_to_include, severity=rule.severity, tags=rule.tags, ) results.append(asdict(rr)) index = index + 1 logger.debug(f"[Engine] Completed evaluation. Returned {len(results)} rule results.") return results @property def rules(self) -> List[Rule]: """Read-only view (returns a shallow copy) of registered rules.""" return list(self._rules) def load_rules_from_yaml(yaml_file: Union[str, Path]) -> List[Rule]: """ Load rules from a YAML file. Supports optional 'severity' and 'tags' keys. Example YAML: - name: suspicious_eval description: "Use of eval() in script" category: script type: regex pattern: "\\beval\\(" severity: medium tags: [obfuscation] Returns: List[Rule] """ rules: List[Rule] = [] path = Path(yaml_file) with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, list): logger.error("[Loader] Rules YAML must be a list of rule objects.") raise ValueError("Rules YAML must be a list of rule objects.") idx = 0 total = len(data) while idx < total: item = data[idx] name = item.get("name") description = item.get("description") category = item.get("category") rule_type = item.get("type", "regex") pattern = item.get("pattern") severity = item.get("severity") tags = item.get("tags") if not name or not description or not category: logger.warning(f"[Loader] Skipping invalid rule at index {idx}: missing required fields.") idx = idx + 1 continue rule = Rule( name=name, description=description, category=category, rule_type=rule_type, pattern=pattern, function=None, # function rules should be registered in code severity=severity, tags=tags if isinstance(tags, list) else None, ) rules.append(rule) idx = idx + 1 return rules