diff --git a/app/__init__.py b/app/__init__.py index a3d0c3b..87b30cb 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,10 +4,10 @@ from pathlib import Path from flask import Flask # Local imports -from .utils.settings import get_settings -from .logging_setup import wire_logging_once, get_app_logger, get_engine_logger +from app.utils.settings import get_settings +from app.logging_setup import wire_logging_once, get_app_logger -from app.blueprints.ui import bp as main_bp # ui blueprint +from app.blueprints.main import bp as main_bp # ui blueprint from app.blueprints.api import api_bp as api_bp # api blueprint from app.blueprints.roadmap import bp as roadmap_bp # roadmap diff --git a/app/blueprints/ui.py b/app/blueprints/main.py similarity index 95% rename from app/blueprints/ui.py rename to app/blueprints/main.py index e861ade..383e41d 100644 --- a/app/blueprints/ui.py +++ b/app/blueprints/main.py @@ -134,15 +134,6 @@ def analyze(): app_logger.error(f"Analysis failed for {url}: {e}") return redirect(url_for("main.index")) - # Add enrichment safely - try: - enrichment = enrich_url(url) - result["enrichment"] = enrichment - app_logger.info(f"[+] Enrichment added for {url}") - except Exception as e: - result["enrichment"] = {} - app_logger.warning(f"[!] Enrichment failed for {url}: {e}") - # Redirect to permalink page for this run return redirect(url_for("main.view_result", run_uuid=result["uuid"])) diff --git a/app/config/settings.yaml b/app/config/settings.yaml index 196e2e5..5bccd29 100644 --- a/app/config/settings.yaml +++ b/app/config/settings.yaml @@ -2,7 +2,15 @@ app: name: SneakyScope version_major: 0 version_minor: 1 - print_rule_loads: True + + # logs when rules are loaded + log_rule_loads: False + + # logs each category of rule ran + log_rule_dispatch: False + + # logs rule pass/fail per rule + log_rule_debug: False cache: recent_runs_count: 10 diff --git a/app/config/suspicious_rules.yaml b/app/config/suspicious_rules.yaml index 662dd8a..4f49b68 100644 --- a/app/config/suspicious_rules.yaml +++ b/app/config/suspicious_rules.yaml @@ -96,39 +96,49 @@ severity: high tags: [credentials, form] -# --- Text Rules (Social Engineering / BEC) --- -- name: urgent_request - description: "Language suggesting urgency (common in phishing/BEC)" - category: text - type: regex - pattern: '\b(urgent|immediately|asap|action\s*required|verify\s*now)\b' - severity: medium - tags: [bec, urgency] +# --- Text Rules (Social Engineering / BEC / Lures) --- -- name: account_suspension - description: "Threat of account suspension/closure" +- name: identity_verification_prompt + description: "Prompts to verify identity/account/email, often gating access" category: text type: regex - pattern: '\b(account\s*(suspend|closure|close)|verify\s*account)\b' + # e.g., "verify your identity", "confirm your email", "validate account" + pattern: '\b(verify|confirm|validate)\s+(?:your\s+)?(identity|account|email)\b' + flags: [i] severity: medium - tags: [bec, scare-tactics] + tags: [bec, verification, gating] -- name: financial_request - description: "Request for gift cards, wire transfer, or money" +- name: gated_document_access + description: "Language gating document access behind an action" category: text type: regex - pattern: '\b(gift\s*card|wire\s*transfer|bank\s*account|bitcoin|crypto|payment\s*required)\b' - severity: high - tags: [bec, financial] + # e.g., "access your secure document", "unlock document", "view document" + action verbs nearby + pattern: '(secure|confidential)\s+document|access\s+(?:the|your)?\s*document|unlock\s+document' + flags: [i] + severity: medium + tags: [lure, document] + +- name: email_collection_prompt + description: "Explicit prompt to enter/provide an email address to proceed" + category: text + type: regex + # e.g., "enter your email address", "provide email", "use your email to continue" + pattern: '\b(enter|provide|use)\s+(?:your\s+)?email\s+(?:address)?\b' + flags: [i] + severity: low + tags: [data-collection, email] - name: credential_reset - description: "Password reset or credential reset wording" + description: "Password/credential reset or login-to-continue wording" category: text type: regex - pattern: '\b(reset\s*password|update\s*credentials|log\s*in\s*to\s*verify|password\s*expiry)\b' + # includes: reset password, update credentials, log in to (verify|view|access), password expiry/expiration + pattern: '\b(reset\s*password|update\s*credentials|log\s*in\s*to\s*(?:verify|view|access)|password\s*(?:expiry|expiration|expires))\b' + flags: [i] severity: medium tags: [bec, credentials] + - name: suspicious_iframe description: "Iframe tag present (possible phishing/malvertising/drive-by)" category: text diff --git a/app/rules/rules_engine.py b/app/rules/rules_engine.py index 49129af..12b2d2d 100644 --- a/app/rules/rules_engine.py +++ b/app/rules/rules_engine.py @@ -3,7 +3,8 @@ rules_engine.py """ import re -import logging +import unicodedata +from collections import Counter from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple, Union @@ -11,6 +12,18 @@ from typing import Callable, Dict, List, Optional, Tuple, Union from app.logging_setup import get_engine_logger from app.utils.settings import get_settings +import re as _re + +FLAG_MAP = { + "i": _re.IGNORECASE, "ignorecase": _re.IGNORECASE, + "m": _re.MULTILINE, "multiline": _re.MULTILINE, + "s": _re.DOTALL, "dotall": _re.DOTALL, "singleline": _re.DOTALL, + "x": _re.VERBOSE, "verbose": _re.VERBOSE, + "a": _re.ASCII, "ascii": _re.ASCII, + "u": _re.UNICODE, "unicode": _re.UNICODE, + "l": _re.LOCALE, "locale": _re.LOCALE, +} + settings = get_settings() import yaml @@ -49,20 +62,64 @@ class Rule: """ Compile the regex pattern once for performance, if applicable. + Behavior: + - Uses flags specified on the rule (list like ['i','m'] or a string like 'im'). + - If the rule category is 'text' and no 'i' flag is set, defaults to IGNORECASE. + - Stores the compiled object on self._compiled_regex. + Returns: bool: True if the regex is compiled and ready, False otherwise. """ + if getattr(self, "rule_type", None) != "regex" or not getattr(self, "pattern", None): + return False + + re_flags = 0 + + # Collect flags from the rule, if any (supports "ims" or ["i","m","s"]) + raw_flags = getattr(self, "flags", None) + if isinstance(raw_flags, str): + for ch in raw_flags: + mapped = FLAG_MAP.get(ch.lower()) + if mapped is not None: + re_flags |= mapped + else: + logger.warning("[Rule] Unknown regex flag %r on rule '%s'", ch, getattr(self, "name", "?")) + elif isinstance(raw_flags, (list, tuple, set)): + for fl in raw_flags: + key = str(fl).lower() + mapped = FLAG_MAP.get(key) + if mapped is not None: + re_flags |= mapped + else: + logger.warning("[Rule] Unknown regex flag %r on rule '%s'", fl, getattr(self, "name", "?")) + + # Default IGNORECASE for text rules if not explicitly provided + cat = (getattr(self, "category", "") or "").lower().strip() + if cat == "text" and not (re_flags & _re.IGNORECASE): + re_flags |= _re.IGNORECASE + + try: + self._compiled_regex = _re.compile(self.pattern, re_flags) + + # Build a compact flag summary inline (e.g., 'ims' or '-' if none) + flag_parts = [] + if re_flags & _re.IGNORECASE: flag_parts.append("i") + if re_flags & _re.MULTILINE: flag_parts.append("m") + if re_flags & _re.DOTALL: flag_parts.append("s") + if re_flags & _re.VERBOSE: flag_parts.append("x") + if re_flags & _re.ASCII: flag_parts.append("a") + if re_flags & _re.UNICODE: flag_parts.append("u") + if re_flags & _re.LOCALE: flag_parts.append("l") + flag_summary = "".join(flag_parts) if flag_parts else "-" + + logger.info("[Rule] Compiled regex for '%s' (flags=%s)", getattr(self, "name", "?"), flag_summary) + return True + + except _re.error as rex: + self._compiled_regex = None + logger.warning("[Rule] Failed to compile regex for '%s': %s", getattr(self, "name", "?"), rex) + return False - if self.rule_type == "regex" and self.pattern: - try: - self._compiled_regex = re.compile(self.pattern, re.IGNORECASE) - logger.debug(f"[Rule] Compiled regex for '{self.name}'") - return True - except re.error as rex: - self._compiled_regex = None - logger.warning(f"[Rule] Failed to compile regex for '{self.name}': {rex}") - return False - return False def run(self, text: str) -> Tuple[bool, str]: """ @@ -198,7 +255,7 @@ class RuleEngine: ) return - if settings.app.print_rule_loads: + if settings.app.log_rule_loads: logger.info( "[engine] add_rule: %s/%s replace=%s -> count=%d", rule.category, rule.name, bool(replace), len(self._rules) @@ -230,6 +287,14 @@ class RuleEngine: self.add_rule(rules[i], replace=replace) i = i + 1 + def _normalize_for_text_rules(self, s: str) -> str: + if not s: + return "" + s = unicodedata.normalize("NFKC", s) + # collapse whitespace; keeps word boundaries sensible + s = _re.sub(r"\s+", " ", s).strip() + return s + def run_all(self, text: str, category: Optional[str] = None) -> List[Dict]: """ Run all rules against text. @@ -241,6 +306,30 @@ class RuleEngine: Returns: List of dicts with PASS/FAIL per rule (JSON-serializable). """ + + # --- dispatch visibility --- if set to true, we log applied categories + if getattr(settings.app, "log_rule_dispatch", False): + all_cats = [r.category for r in self._rules] + cat_counts = Counter(all_cats) + # Which categories are being applied this run? + if category is None: + selected_categories = sorted(cat_counts.keys()) + else: + selected_categories = [category] + + # How many rules match the selection? + selected_rule_count = sum(1 for r in self._rules if r.category in selected_categories) + try: + logger.info( + "[engine] applying categories: %s | selected_rules=%d | totals=%s", + ",".join(selected_categories), + selected_rule_count, + dict(cat_counts), + ) + except Exception: + pass + # --- end dispatch visibility --- + results: List[Dict] = [] index = 0 @@ -248,12 +337,20 @@ class RuleEngine: while index < total: rule = self.rules[index] + # if we are running a text rule, let's normalize the text. + if category == "text": + text = self._normalize_for_text_rules(text) + if category is not None and rule.category != category: index = index + 1 continue matched, reason = rule.run(text) + # very fine-grained trace per rule: + if getattr(settings.app, "log_rule_debug", False): + logger.info(f"[engine] eval: cat:{rule.category} - rule:{rule.name} - result: {matched} - reason:{reason}" ) + result_str = "FAIL" if matched else "PASS" reason_to_include: Optional[str] if matched: diff --git a/app/templates/partials/result_enrichment.html b/app/templates/partials/result_enrichment.html index 81be05e..f399c23 100644 --- a/app/templates/partials/result_enrichment.html +++ b/app/templates/partials/result_enrichment.html @@ -54,5 +54,5 @@

No enrichment data available.

{% endif %} -

Back to top

+

Back to top

\ No newline at end of file diff --git a/app/templates/partials/result_forms.html b/app/templates/partials/result_forms.html index ca53ad8..d34f81f 100644 --- a/app/templates/partials/result_forms.html +++ b/app/templates/partials/result_forms.html @@ -109,5 +109,5 @@

No form issues detected.

{% endif %} -

Back to top

+

Back to top

\ No newline at end of file diff --git a/app/templates/partials/result_scripts.html b/app/templates/partials/result_scripts.html index b7f8934..9259422 100644 --- a/app/templates/partials/result_scripts.html +++ b/app/templates/partials/result_scripts.html @@ -116,5 +116,5 @@

No suspicious scripts detected.

{% endif %} -

Back to top

+

Back to top

\ No newline at end of file diff --git a/app/templates/partials/result_ssl_tls.html b/app/templates/partials/result_ssl_tls.html index f48c892..05a97fe 100644 --- a/app/templates/partials/result_ssl_tls.html +++ b/app/templates/partials/result_ssl_tls.html @@ -193,7 +193,7 @@ {% endif %} -

Back to top

+

Back to top

{% endmacro %} diff --git a/app/templates/partials/result_text.html b/app/templates/partials/result_text.html new file mode 100644 index 0000000..a1e54f4 --- /dev/null +++ b/app/templates/partials/result_text.html @@ -0,0 +1,120 @@ + +
+

Text

+ + {% if suspicious_text and suspicious_text|length > 0 %} +
+ + + + + + + + + + + + + + + + + + + {% for rec in suspicious_text %} + + + + + + + + + + + + + + + + + {% endfor %} + +
SourceIndicatorsTagsMatches (Rules)Text Snippet
+ {{ (rec.type or 'page')|title }} + + {{ rec.rules|length if rec.rules else 0 }} + + {% set ns = namespace(tags=[]) %} + {% if rec.rules %} + {% for r in rec.rules %} + {% if r.tags %} + {% for t in r.tags %} + {% if t not in ns.tags %} + {% set ns.tags = ns.tags + [t] %} + {% endif %} + {% endfor %} + {% endif %} + {% endfor %} + {% endif %} + {% if ns.tags and ns.tags|length > 0 %} +
+ {% for t in ns.tags %} + {{ t }} + {% endfor %} +
+ {% else %} + None + {% endif %} +
+ {% if rec.rules and rec.rules|length > 0 %} +
    + {% for r in rec.rules %} +
  • + {{ r.name }} + {% if r.severity %} + {% set sev = r.severity|lower %} + + {{ r.severity|title }} + + {% endif %} + {% if r.tags %} + {% for t in r.tags %} + {{ t }} + {% endfor %} + {% endif %} + {% if r.description %} + — {{ r.description }} + {% endif %} +
  • + {% endfor %} +
+ {% else %} + N/A + {% endif %} +
+ {% if rec.content_snippet %} +
+ + View snippet ({{ rec.content_snippet|length }} chars) + +
{{ rec.content_snippet }}
+
+ {% else %} + N/A + {% endif %} +
+
+ + {% else %} +

No text issues detected.

+ {% endif %} + +

Back to top

+
diff --git a/app/templates/result.html b/app/templates/result.html index 6842fdb..7c22b18 100644 --- a/app/templates/result.html +++ b/app/templates/result.html @@ -15,6 +15,7 @@ Redirects Forms Suspicious Scripts + Suspicious Text Screenshot Source @@ -35,7 +36,7 @@ {{ request.host_url }}results/{{ uuid }}

-

Back to top

+

Back to top

@@ -73,23 +74,25 @@ {% else %}

No redirects detected.

{% endif %} -

Back to top

+

Back to top

{% include "partials/result_forms.html" %} - {% include "partials/result_scripts.html" %} + + {% include "partials/result_text.html" with context %} +

Screenshot

Screenshot -

Back to top

+

Back to top

@@ -102,7 +105,7 @@ View Source

-

Back to top

+

Back to top

diff --git a/app/utils/browser.py b/app/utils/browser.py index 560c7af..467588e 100644 --- a/app/utils/browser.py +++ b/app/utils/browser.py @@ -29,6 +29,7 @@ from typing import Any, Dict, List, Optional from urllib.parse import urlparse from bs4 import BeautifulSoup +import re from flask import current_app from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError @@ -85,64 +86,6 @@ class Browser: index = index + 1 return summary - def run_rule_checks(self, text: str, category: str) -> Dict[str, Any]: - """ - Run all rules for a given category against provided text, returning a table-friendly model. - - Args: - text: Text to analyze (HTML, snippet, etc.) - category: One of 'form', 'script', 'text' (or any category your rules use) - - Returns: - { - "checks": [ - { "name": str, "description": str, "category": str, - "result": "PASS"|"FAIL", "reason": Optional[str], - "severity": Optional[str], "tags": Optional[List[str]] }, ... - ], - "summary": { "fail_count": int, "total_rules": int } - } - """ - out: Dict[str, Any] = {"checks": [], "summary": {"fail_count": 0, "total_rules": 0}} - engine = self._get_rule_engine() - - if engine is None: - return out - - try: - engine_results = engine.run_all(text, category=category) # list of dicts - index = 0 - total = len(engine_results) - while index < total: - item = engine_results[index] - normalized = { - "name": item.get("name"), - "description": item.get("description"), - "category": item.get("category"), - "result": item.get("result"), # "PASS" | "FAIL" - "reason": item.get("reason"), # present on FAIL by engine design - "severity": item.get("severity"), - "tags": item.get("tags"), - } - out["checks"].append(normalized) - index = index + 1 - - out["summary"] = self._summarize_results(out["checks"]) - except Exception as exc: - # Preserve shape; record the error as a synthetic PASS (so UI doesn't break) - out["checks"].append({ - "name": "engine_error", - "description": "Rule engine failed during evaluation", - "category": category, - "result": "PASS", - "reason": f"{exc}", - "severity": None, - "tags": None - }) - out["summary"] = {"fail_count": 0, "total_rules": 1} - - return out - def build_rule_checks_overview(self, full_html_text: str) -> List[Dict[str, Any]]: """ Build a top-level overview for the results page: runs each category across @@ -376,6 +319,135 @@ class Browser: return results + def analyze_text(self, html: str) -> List[Dict[str, Any]]: + """ + Extract visible page text and evaluate text rules. + Only include rows that matched at least one rule. + + Returns a list with 0..1 records shaped like: + { + "type": "page", + "content_snippet": "", + "rules": [ + {"name": "...", "description": "...", "severity": "...", "tags": [...]}, + ... + ], + } + """ + results: List[Dict[str, Any]] = [] + + # Short-circuit on missing html + if not html: + return results + + # Extract visible text (strip scripts/styles) + try: + soup = BeautifulSoup(html, "lxml") + for tag in soup(["script", "style", "noscript", "template"]): + tag.decompose() + # Basic hidden cleanup (best-effort) + for el in soup.select('[hidden], [aria-hidden="true"]'): + el.decompose() + + text = soup.get_text(separator=" ", strip=True) + if not text: + return results + + # Normalize whitespace so regexes behave consistently + text = re.sub(r"\s+", " ", text).strip() + + except Exception as exc: + # Keep consistency with your other analyzers + results.append({ + "type": "page", + "heuristics": [f"Text extraction error: {exc}"] + }) + return results + + engine = self._get_rule_engine() + if engine is None: + return results + + matches_for_record: List[Dict[str, Any]] = [] + matched_phrases: List[str] = [] # order-preserving + seen_phrases = set() + + # How many characters to show for the preview snippet + preview_len = getattr(settings.ui, "snippet_preview_len", 200) + + try: + # 1) Regex rules over full page text + for r in engine.rules: + if getattr(r, "category", None) != "text": + continue + + rtype = getattr(r, "rule_type", None) + if rtype == "regex": + ok, _reason = r.run(text) + if not ok: + continue + + # Try to pull matched words/phrases + compiled = getattr(r, "_compiled_regex", None) + if compiled is None and getattr(r, "pattern", None): + try: + compiled = re.compile(r.pattern, re.IGNORECASE) + except re.error: + compiled = None + + # Collect a few (deduped) matched phrases + if compiled is not None: + # limit per rule to avoid flooding + per_rule_count = 0 + for m in compiled.finditer(text): + phrase = m.group(0).strip() + if phrase and phrase not in seen_phrases: + matched_phrases.append(phrase) + seen_phrases.add(phrase) + per_rule_count += 1 + if per_rule_count >= 5: # cap per rule + break + + matches_for_record.append({ + "name": getattr(r, "name", "unknown_rule"), + "description": getattr(r, "description", "") or "", + "severity": getattr(r, "severity", None), + "tags": getattr(r, "tags", None), + }) + + elif rtype == "function": + # Optional: function-style rules can inspect the full text + facts = {"text": text, "category": "text"} + ok, reason = r.run(facts) + if ok: + matches_for_record.append({ + "name": getattr(r, "name", "unknown_rule"), + "description": (reason or "") or getattr(r, "description", ""), + "severity": getattr(r, "severity", None), + "tags": getattr(r, "tags", None), + }) + + if matches_for_record: + # Build the snippet from matched words/phrases + joined = " … ".join(matched_phrases) if matched_phrases else "" + if len(joined) > preview_len: + joined = joined[:preview_len] + "…" + + record: Dict[str, Any] = { + "type": "page", + "content_snippet": joined or None, + "rules": matches_for_record, + } + results.append(record) + + except Exception as exc: + results.append({ + "type": "page", + "heuristics": [f"Text analysis error: {exc}"] + }) + + return results + # ----------------------------------------------------------------------- # Fetcher / Orchestrator # ----------------------------------------------------------------------- @@ -458,12 +530,15 @@ class Browser: # Read back saved source html_content = source_path.read_text(encoding="utf-8") - # Forms analysis (per-form rule checks) + # Forms analysis forms_info = self.analyze_forms(html_content, final_url) - # Scripts artifacts (no detection here) + # Scripts artifacts suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url) + # suspicious text + flagged_text = self.analyze_text(html_content) + # Enrichment enrichment = enrich_url(url, fetch_ssl_enabled) @@ -486,7 +561,8 @@ class Browser: "scripts": scripts_seen, "forms": forms_info, "suspicious_scripts": suspicious_scripts, - "rule_checks": rule_checks_overview, # table-ready for UI + "suspicious_text":flagged_text, + "rule_checks": rule_checks_overview, "enrichment": enrichment } diff --git a/app/utils/settings.py b/app/utils/settings.py index 593b98d..1f5acd7 100644 --- a/app/utils/settings.py +++ b/app/utils/settings.py @@ -63,7 +63,9 @@ class AppConfig: name: str = "MyApp" version_major: int = 1 version_minor: int = 0 - print_rule_loads: bool = False + log_rule_loads: bool = False + log_rule_dispatch: bool = False + log_rule_debug: bool = False @dataclass