""" rules_engine.py Flask-logger integrated rules engine for SneakyScope. Logs go to `current_app.logger` when a Flask app context is active, otherwise to a namespaced standard logger "sneakyscope.rules". """ import re import logging from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union import yaml try: # Flask is optional; engine still works without it. from flask import current_app, has_app_context except Exception: current_app = None # type: ignore def has_app_context() -> bool: # type: ignore return False def get_engine_logger() -> logging.Logger: """ Return a logger that prefers Flask's current_app.logger if available. Falls back to a namespaced standard logger otherwise. """ if has_app_context() and current_app is not None and hasattr(current_app, "logger"): return current_app.logger return logging.getLogger("sneakyscope.rules") @dataclass class Rule: """ Represents a single detection rule. When rule_type == 'regex', 'pattern' must be provided. When rule_type == 'function', 'function' must be provided and return (matched: bool, reason: str). """ name: str description: str category: str rule_type: str = "regex" pattern: Optional[str] = None function: Optional[Callable[[str], Tuple[bool, str]]] = None severity: Optional[str] = None # 'low' | 'medium' | 'high' (optional) tags: Optional[List[str]] = field(default=None) # e.g., ['obfuscation', 'phishing'] (optional) # Internal compiled regex cache (not serialized) _compiled_regex: Optional[re.Pattern] = field(default=None, repr=False, compare=False) def compile_if_needed(self, logger: Optional[logging.Logger] = None) -> bool: """ Compile the regex pattern once for performance, if applicable. Returns: bool: True if the regex is compiled and ready, False otherwise. """ if logger is None: logger = get_engine_logger() if self.rule_type == "regex" and self.pattern: try: self._compiled_regex = re.compile(self.pattern, re.IGNORECASE) logger.debug(f"[Rule] Compiled regex for '{self.name}'") return True except re.error as rex: self._compiled_regex = None logger.warning(f"[Rule] Failed to compile regex for '{self.name}': {rex}") return False return False def run(self, text: str, logger: Optional[logging.Logger] = None) -> Tuple[bool, str]: """ Run the rule on the given text. Returns: (matched: bool, reason: str) """ if logger is None: logger = get_engine_logger() if self.rule_type == "regex": if not self.pattern: logger.warning(f"[Rule] '{self.name}' missing regex pattern.") return False, "Invalid rule configuration: missing pattern" if self._compiled_regex is None: compiled_ok = self.compile_if_needed(logger=logger) if not compiled_ok: return False, f"Invalid regex pattern: {self.pattern!r}" if self._compiled_regex and self._compiled_regex.search(text): return True, f"Matched regex '{self.pattern}' → {self.description}" return False, "No match" if self.rule_type == "function": if callable(self.function): try: matched, reason = self.function(text) if isinstance(matched, bool) and isinstance(reason, str): return matched, reason logger.warning(f"[Rule] '{self.name}' function returned invalid types.") return False, "Invalid function return type; expected (bool, str)" except Exception as exc: logger.exception(f"[Rule] '{self.name}' function raised exception.") return False, f"Rule function raised exception: {exc!r}" logger.warning(f"[Rule] '{self.name}' has invalid function configuration.") return False, "Invalid rule configuration: function not callable" logger.warning(f"[Rule] '{self.name}' has unknown type '{self.rule_type}'.") return False, f"Invalid rule configuration: unknown type '{self.rule_type}'" @dataclass class RuleResult: """ Uniform per-rule outcome for UI/API consumption. result is "PASS" or "FAIL" (FAIL == matched True) """ name: str description: str category: str result: str # "PASS" | "FAIL" reason: Optional[str] = None severity: Optional[str] = None tags: Optional[List[str]] = None class RuleEngine: """ Loads and executes rules against provided text, with Flask-aware logging. """ def __init__(self, rules: Optional[List[Rule]] = None, logger: Optional[logging.Logger] = None): """ Args: rules: Optional initial rule list. logger: Optional explicit logger. If None, uses Flask app logger if available, otherwise a namespaced standard logger. """ if logger is None: self.logger = get_engine_logger() else: self.logger = logger self.rules: List[Rule] = rules or [] self._compile_all() def _compile_all(self) -> None: """ Compile all regex rules at initialization and warn about failures. """ index = 0 total = len(self.rules) while index < total: rule = self.rules[index] if rule.rule_type == "regex": compiled_ok = rule.compile_if_needed(logger=self.logger) if not compiled_ok: self.logger.warning(f"[Engine] Regex failed at init for rule '{rule.name}' (pattern={rule.pattern!r})") index = index + 1 def add_rule(self, rule: Rule) -> None: """ Add a new rule at runtime; compiles regex if needed and logs failures. """ self.rules.append(rule) if rule.rule_type == "regex": compiled_ok = rule.compile_if_needed(logger=self.logger) if not compiled_ok: self.logger.warning(f"[Engine] Regex failed when adding rule '{rule.name}' (pattern={rule.pattern!r})") def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]: """ Run all rules against text. Args: text: The content to test. category: If provided, only evaluate rules that match this category. Returns: List of dicts with PASS/FAIL per rule (JSON-serializable). """ results: List[Dict] = [] index = 0 total = len(self.rules) while index < total: rule = self.rules[index] if category is not None and rule.category != category: index = index + 1 continue matched, reason = rule.run(text, logger=self.logger) result_str = "FAIL" if matched else "PASS" reason_to_include: Optional[str] if matched: reason_to_include = reason else: reason_to_include = None rr = RuleResult( name=rule.name, description=rule.description, category=rule.category, result=result_str, reason=reason_to_include, severity=rule.severity, tags=rule.tags, ) results.append(asdict(rr)) index = index + 1 self.logger.debug(f"[Engine] Completed evaluation. Returned {len(results)} rule results.") return results def load_rules_from_yaml(yaml_file: Union[str, Path], logger: Optional[logging.Logger] = None) -> List[Rule]: """ Load rules from a YAML file. Supports optional 'severity' and 'tags' keys. Example YAML: - name: suspicious_eval description: "Use of eval() in script" category: script type: regex pattern: "\\beval\\(" severity: medium tags: [obfuscation] Returns: List[Rule] """ if logger is None: logger = get_engine_logger() rules: List[Rule] = [] path = Path(yaml_file) with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, list): logger.error("[Loader] Rules YAML must be a list of rule objects.") raise ValueError("Rules YAML must be a list of rule objects.") idx = 0 total = len(data) while idx < total: item = data[idx] name = item.get("name") description = item.get("description") category = item.get("category") rule_type = item.get("type", "regex") pattern = item.get("pattern") severity = item.get("severity") tags = item.get("tags") if not name or not description or not category: logger.warning(f"[Loader] Skipping invalid rule at index {idx}: missing required fields.") idx = idx + 1 continue rule = Rule( name=name, description=description, category=category, rule_type=rule_type, pattern=pattern, function=None, # function rules should be registered in code severity=severity, tags=tags if isinstance(tags, list) else None, ) rules.append(rule) idx = idx + 1 logger.info(f"[Loader] Loaded {len(rules)} rules from '{yaml_file}'.") return rules