From 693f7d67b91b303d0589e737970f2651ab518f52 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Thu, 21 Aug 2025 22:05:16 -0500 Subject: [PATCH] feat: HTTPS auto-normalization; robust TLS intel UI; global rules state; clean logging; preload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add SSL/TLS intelligence pipeline: - crt.sh lookup with expired-filtering and root-domain wildcard resolution - live TLS version/cipher probe with weak/legacy flags and probe notes - UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists - Front page: checkbox to optionally fetch certificate/CT data - Introduce `URLNormalizer` with punycode support and typo repair - Auto-prepend `https://` for bare domains (e.g., `google.com`) - Optional quick HTTPS reachability + `http://` fallback - Provide singleton via function-cached `@singleton_loader`: - `get_url_normalizer()` reads defaults from Settings (if present) - Standardize function-rule return shape to `(bool, dict|None)` across `form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.) - Harden `FunctionRuleAdapter`: - Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs - Adapt non-dict inputs to facts (category-aware and via provided adapter) - Return `(True, dict)` on match, `(False, None)` on miss - Bind-time logging with file:line + function id for diagnostics - `RuleEngine`: - Back rules by private `self._rules`; `rules` property returns copy - Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile - Fix AttributeError from property assignment during `__init__` - Replace hidden singleton factory with explicit builder + global state: - `app/rules/factory.py::build_rules_engine()` builds and logs totals - `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF - `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()` - Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts - Unify logging wiring: - `wire_logging_once(app)` clears and attaches a single handler chain - Create two named loggers: `sneakyscope.app` and `sneakyscope.engine` - Disable propagation to prevent dupes; include pid/logger name in format - Remove stray/duplicate handlers and import-time logging - Optional dedup filter for bursty repeats (kept off by default) - Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration - Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`) - Jinja: replace `{% return %}` with structured `if/elif/else` branches - Add toggle button to show raw JSON for TLS/CT section - Consumers should import the rules engine via: - `from app.state import get_rules_engine` - Use `build_rules_engine()` **only** during preload/init to construct the instance, then publish with `set_rules_engine()`. Do not call old singleton factories. - New/changed modules (high level): - `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()` - `app/rules/function_rules.py` (±) — normalized payload returns - `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs - `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes - `app/rules/factory.py` (±) — pure builder; totals logged post-registration - `app/state.py` (+) — process-global rules engine - `app/logging_setup.py` (±) — single chain, two named loggers - `app/wsgi.py` (±) — preload build + `set_rules_engine()` - `entrypoint.sh` (±) — add `--preload` - templates (±) — TLS card, raw toggle; front-page checkbox Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init. --- app/__init__.py | 116 ++---------- app/blueprints/api.py | 8 +- app/blueprints/ui.py | 97 ++++++++-- app/config/settings.yaml | 1 + app/logging_setup.py | 61 ++++++ app/rules/factory.py | 51 +++++ app/rules/function_rules.py | 236 ++++++++++++++++++----- app/{utils => rules}/rules_engine.py | 171 +++++++++++------ app/state.py | 17 ++ app/static/style.css | 51 +++++ app/templates/_macros_ssl_tls.html | 182 ++++++++++++++++++ app/templates/index.html | 7 + app/templates/result.html | 21 ++- app/utils/browser.py | 12 +- app/utils/enrichment.py | 254 +++++++++++++++++++++++-- app/utils/io_helpers.py | 9 +- app/utils/settings.py | 1 + app/utils/tls_probe.py | 270 +++++++++++++++++++++++++++ app/utils/url_tools.py | 133 +++++++++++++ app/wsgi.py | 11 ++ entrypoint.sh | 1 + requirements.txt | 22 ++- 22 files changed, 1476 insertions(+), 256 deletions(-) create mode 100644 app/logging_setup.py create mode 100644 app/rules/factory.py rename app/{utils => rules}/rules_engine.py (64%) create mode 100644 app/state.py create mode 100644 app/templates/_macros_ssl_tls.html create mode 100644 app/utils/tls_probe.py create mode 100644 app/utils/url_tools.py diff --git a/app/__init__.py b/app/__init__.py index f59c5f2..4f447d7 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -5,26 +5,11 @@ from flask import Flask # Local imports from .utils.settings import get_settings -from .utils.rules_engine import RuleEngine, load_rules_from_yaml, Rule - -# our code based rules -from .rules.function_rules import ( - FactAdapter, - FunctionRuleAdapter, - script_src_uses_data_or_blob, - script_src_has_dangerous_extension, - script_third_party_host, - form_submits_to_different_host, - form_http_on_https_page, - form_action_missing, -) +from .logging_setup import wire_logging_once, get_app_logger, get_engine_logger from app.blueprints import ui # ui blueprint from app.blueprints import api # api blueprint -# from .utils import io_helpers # if need logging/setup later -# from .utils import cache_db # available for future injections - def create_app() -> Flask: """ Create and configure the Flask application instance. @@ -35,103 +20,23 @@ def create_app() -> Flask: # Basic app object app = Flask(__name__, template_folder="templates", static_folder="static") + # logging setup + wire_logging_once(app) + + app_logger = get_app_logger() + # Load settings (safe fallback to defaults if file missing) settings = get_settings() # Secret key loaded from env (warn if missing) app.secret_key = os.getenv("SECRET_KEY") if not app.secret_key: - app.logger.warning("[init] SECRET_KEY is not set; sessions may be insecure in production.") + app_logger.warning("[init] SECRET_KEY is not set; sessions may be insecure in production.") # Configure storage directory (bind-mount is still handled by sandbox.sh) sandbox_storage_default = Path("/data") app.config["SANDBOX_STORAGE"] = str(sandbox_storage_default) - # --------------------------- - # Suspicious Rules Engine - # --------------------------- - - # Determine rules file path relative to this package (allow env override) - base_dir = Path(__file__).resolve().parent - default_rules_path = base_dir / "config" / "suspicious_rules.yaml" - rules_path_str = os.getenv("SNEAKYSCOPE_RULES_FILE", str(default_rules_path)) - rules_path = Path(rules_path_str) - - # Create engine bound to Flask logger so all verbose/debug goes to app.logger - engine = RuleEngine(rules=[], logger=app.logger) - - # Try to load from YAML if present; log clearly if not - if rules_path.exists(): - try: - loaded_rules = load_rules_from_yaml(rules_path, logger=app.logger) - # Add rules one-by-one (explicit, clearer logs if any rule fails to compile) - index = 0 - total = len(loaded_rules) - while index < total: - engine.add_rule(loaded_rules[index]) - index = index + 1 - app.logger.info(f"[init] Loaded {len(loaded_rules)} suspicious rules from {rules_path}") - except Exception as e: - app.logger.warning(f"[init] Failed loading rules from {rules_path}: {e}") - else: - app.logger.warning(f"[init] Rules file not found at {rules_path}. Engine will start with zero rules.") - - # Built-in function-based rules - adapter = FactAdapter(logger=app.logger) - - engine.add_rule(Rule( - name="form_action_missing", - description="Form has no action attribute", - category="form", - rule_type="function", - function=FunctionRuleAdapter(form_action_missing, category="form", adapter=adapter), - )) - - engine.add_rule(Rule( - name="form_http_on_https_page", - description="Form submits via HTTP from HTTPS page", - category="form", - rule_type="function", - function=FunctionRuleAdapter(form_http_on_https_page, category="form", adapter=adapter), - )) - - engine.add_rule(Rule( - name="form_submits_to_different_host", - description="Form submits to a different host", - category="form", - rule_type="function", - function=FunctionRuleAdapter(form_submits_to_different_host, category="form", adapter=adapter), - )) - - # Script rules expect dict 'facts' (you’ll wire per-script facts later) - engine.add_rule(Rule( - name="script_src_uses_data_or_blob", - description="Script src uses data:/blob: URL", - category="script", - rule_type="function", - function=FunctionRuleAdapter(script_src_uses_data_or_blob, category="script", adapter=adapter), - )) - - engine.add_rule(Rule( - name="script_src_has_dangerous_extension", - description="External script with dangerous extension", - category="script", - rule_type="function", - function=FunctionRuleAdapter(script_src_has_dangerous_extension, category="script", adapter=adapter), - )) - - engine.add_rule(Rule( - name="script_third_party_host", - description="Script is from a third-party host", - category="script", - rule_type="function", - function=FunctionRuleAdapter(script_third_party_host, category="script", adapter=adapter), - )) - - # Store engine both ways: attribute (convenient) and config - app.rule_engine = engine - app.config["RULE_ENGINE"] = engine - # App metadata available to templates app.config["APP_NAME"] = settings.app.name app.config["APP_VERSION"] = f"v{settings.app.version_major}.{settings.app.version_minor}" @@ -140,9 +45,10 @@ def create_app() -> Flask: app.register_blueprint(ui.bp) app.register_blueprint(api.api_bp) + app_logger = get_app_logger() + # Example log lines so we know we booted cleanly - app.logger.info(f"SneakyScope started: {app.config['APP_NAME']} {app.config['APP_VERSION']}") - app.logger.info(f"SANDBOX_STORAGE: {app.config['SANDBOX_STORAGE']}") - app.logger.info(f"Registered {len(engine.rules)} total rules (YAML + function)") + app_logger.info(f"SneakyScope started: {app.config['APP_NAME']} {app.config['APP_VERSION']}") + app_logger.info(f"SANDBOX_STORAGE: {app.config['SANDBOX_STORAGE']}") return app diff --git a/app/blueprints/api.py b/app/blueprints/api.py index ba2a2bb..5c4bf37 100644 --- a/app/blueprints/api.py +++ b/app/blueprints/api.py @@ -30,12 +30,14 @@ import time from flask import Blueprint, request, jsonify, current_app, send_file, abort from pathlib import Path +from app.logging_setup import get_app_logger from app.utils.settings import get_settings from app.utils.external_fetcher import ExternalScriptFetcher from werkzeug.exceptions import HTTPException api_bp = Blueprint("api", __name__, url_prefix="/api") +app_logger = get_app_logger() def _resolve_results_path(job_id: str) -> str: """ @@ -83,7 +85,7 @@ def _api_500(err): Return JSON for server errors and log the exception. """ try: - current_app.logger.exception("API 500") + app_logger.exception("API 500") except Exception: pass return jsonify({"ok": False, "error": "internal server error"}), 500 @@ -107,7 +109,7 @@ def analyze_script(): script_url = (script_url_raw or "").strip() if isinstance(script_url_raw, str) else "" # log this request - current_app.logger.info(f"Got request to analyze {script_url} via API ") + app_logger.info(f"Got request to analyze {script_url} via API ") if not job_id or not script_url: return jsonify({"ok": False, "error": "Missing job_id (or uuid) or url"}), 400 @@ -174,7 +176,7 @@ def analyze_script(): findings = matched except Exception as exc: try: - current_app.logger.error("Rule engine error", extra={"error": str(exc)}) + app_logger.error("Rule engine error", extra={"error": str(exc)}) except Exception: pass findings = [] diff --git a/app/blueprints/ui.py b/app/blueprints/ui.py index 992ce3f..58a56f6 100644 --- a/app/blueprints/ui.py +++ b/app/blueprints/ui.py @@ -7,10 +7,14 @@ from pathlib import Path from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, send_file, abort +from app.utils.url_tools import get_url_normalizer from app.utils.browser import get_browser from app.utils.enrichment import enrich_url from app.utils.settings import get_settings from app.utils.io_helpers import get_recent_results +from app.logging_setup import get_app_logger + +app_logger = get_app_logger() bp = Blueprint("main", __name__) @@ -18,6 +22,47 @@ settings = get_settings() app_name = settings.app.name app_version = f"v {settings.app.version_major}.{settings.app.version_minor}" + +# --- data cleaner for tls to ensure data is standardized +def normalize_ssl_tls_for_view(ssl_tls): + """ + Normalize/guard the ssl_tls structure for template rendering. + Adds missing keys so Jinja doesn't need defensive checks everywhere. + """ + safe = {"crtsh": None, "probe": None, "error": None, "skipped": False, "reason": None} + + if not isinstance(ssl_tls, dict): + safe["error"] = "ssl_tls is not a dict" + return safe + + safe.update(ssl_tls) + + if safe.get("skipped") is True: + return safe # don’t force probe/crtsh keys when skipped + + # Probe guards + probe = safe.get("probe") or {} + if "results_by_version" not in probe or not isinstance(probe["results_by_version"], dict): + probe["results_by_version"] = {} + if "weak_protocols" not in probe or not isinstance(probe["weak_protocols"], list): + probe["weak_protocols"] = [] + if "weak_ciphers" not in probe or not isinstance(probe["weak_ciphers"], list): + probe["weak_ciphers"] = [] + if "errors" not in probe or not isinstance(probe["errors"], list): + probe["errors"] = [] + if "hostname" not in probe: + probe["hostname"] = None + if "port" not in probe: + probe["port"] = 443 + safe["probe"] = probe + + # crt.sh guards (we keep it mostly raw; macro only reads a few fields) + if "crtsh" not in safe: + safe["crtsh"] = None + + return safe + + # --- context processor --- @bp.context_processor def inject_app_info(): @@ -48,7 +93,7 @@ def index(): storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() # Build the recent list (non-fatal if storage is empty or unreadable) - recent_results = get_recent_results(storage, recent_count, current_app.logger) + recent_results = get_recent_results(storage, recent_count, app_logger) # Pass to template; your index.html will hide the card if list is empty return render_template("index.html", recent_results=recent_results) @@ -56,8 +101,23 @@ def index(): @bp.route("/analyze", methods=["POST"]) def analyze(): url = request.form.get("url", "").strip() - current_app.logger.info(f"[*] Analyzing {url}") - if not url: + + # Checkbox comes as '1' when checked, or None when not present + fetch_ssl = request.form.get("fetch_ssl") + fetch_ssl_enabled = bool(fetch_ssl == "1") + + normalizer = get_url_normalizer() + + try: + target = normalizer.normalize_for_analysis(url) + except ValueError: + app_logger.warning("Empty or invalid URL input") + return redirect(url_for("index")) + + app_logger.info(f"[*] Analyzing URL{target}") + app_logger.info(f"[*] SSL Checks set to {fetch_ssl_enabled}") + + if not target: flash("Please enter a URL.", "error") return redirect(url_for("main.index")) @@ -66,44 +126,57 @@ def analyze(): try: browser = get_browser() - result = asyncio.run(browser.fetch_page_artifacts(url)) - current_app.logger.info(f"[+] Analysis done for {url}") + result = asyncio.run(browser.fetch_page_artifacts(url,fetch_ssl_enabled=fetch_ssl_enabled)) + app_logger.info(f"[+] Analysis done for {url}") except Exception as e: flash(f"Analysis failed: {e}", "error") - current_app.logger.error(f"Analysis failed for {url}: {e}") + app_logger.error(f"Analysis failed for {url}: {e}") return redirect(url_for("main.index")) # Add enrichment safely try: enrichment = enrich_url(url) result["enrichment"] = enrichment - current_app.logger.info(f"[+] Enrichment added for {url}") + app_logger.info(f"[+] Enrichment added for {url}") except Exception as e: result["enrichment"] = {} - current_app.logger.warning(f"[!] Enrichment failed for {url}: {e}") + app_logger.warning(f"[!] Enrichment failed for {url}: {e}") # Redirect to permalink page for this run return redirect(url_for("main.view_result", run_uuid=result["uuid"])) @bp.route("/results/", methods=["GET"]) def view_result(run_uuid: str): + """ + View the analysis results for a given run UUID. + Loads results.json from SANDBOX_STORAGE/, + normalizes structures for template safety, and renders the result page. + """ # Resolve SANDBOX_STORAGE from app config storage = Path(current_app.config["SANDBOX_STORAGE"]).resolve() run_dir = storage / run_uuid results_path = run_dir / "results.json" + # Ensure results exist if not results_path.exists(): - current_app.logger.error(f"Results not found for UUID: {run_uuid}") + app_logger.error(f"Results not found for UUID: {run_uuid}") abort(404) + # Load the results JSON with open(results_path, "r", encoding="utf-8") as f: result = json.load(f) - # Pass the UUID to the template for artifact links + # Add UUID so template can build artifact links result["uuid"] = run_uuid + # === Normalize SSL/TLS structure for safe rendering === + if "ssl_tls" in result: + result["ssl_tls"] = normalize_ssl_tls_for_view(result["ssl_tls"]) + + # Pass the enriched result dict to the template return render_template("result.html", **result) + @bp.route("/artifacts//", methods=["GET"]) def artifacts(run_uuid: str, filename: str): # Resolve SANDBOX_STORAGE from app config @@ -115,11 +188,11 @@ def artifacts(run_uuid: str, filename: str): try: full_path.relative_to(run_dir.resolve()) except ValueError: - current_app.logger.warning(f"Directory traversal attempt: {filename}") + app_logger.warning(f"Directory traversal attempt: {filename}") abort(404) if not full_path.exists(): - current_app.logger.error(f"Artifact not found: {filename} for UUID {run_uuid}") + app_logger.error(f"Artifact not found: {filename} for UUID {run_uuid}") abort(404) return send_file(full_path) diff --git a/app/config/settings.yaml b/app/config/settings.yaml index 05b58ef..196e2e5 100644 --- a/app/config/settings.yaml +++ b/app/config/settings.yaml @@ -2,6 +2,7 @@ app: name: SneakyScope version_major: 0 version_minor: 1 + print_rule_loads: True cache: recent_runs_count: 10 diff --git a/app/logging_setup.py b/app/logging_setup.py new file mode 100644 index 0000000..0bf5701 --- /dev/null +++ b/app/logging_setup.py @@ -0,0 +1,61 @@ +# app/logging_setup.py +import logging +import sys + +_LOGGING_WIRED = False # module-level guard + +def _clear(logger: logging.Logger) -> None: + for h in list(logger.handlers): + logger.removeHandler(h) + +def wire_logging_once(app) -> None: + global _LOGGING_WIRED + if _LOGGING_WIRED: + return + _LOGGING_WIRED = True + + # Reuse gunicorn handlers if present + guni = logging.getLogger("gunicorn.error") + + # Clear Flask's default handlers + try: + app.logger.handlers.clear() + except Exception: + for h in list(app.logger.handlers): + app.logger.removeHandler(h) + + for name in ("sneakyscope.app", "sneakyscope.engine"): + _clear(logging.getLogger(name)) + + + # Fallback formatter shows logger name to distinguish engine/app + h = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter( + "[%(asctime)s] pid=%(process)d %(levelname)-8s %(name)s %(module)-18s %(message)s (line %(lineno)s)" + ) + h.setFormatter(fmt) + h.setFormatter(fmt) + h.setLevel(logging.INFO) + + # 3) Attach to app + project loggers; stop propagation everywhere + app.logger.addHandler(h); app.logger.setLevel(logging.INFO); app.logger.propagate = False + eng = logging.getLogger("sneakyscope.engine") + app_lg = logging.getLogger("sneakyscope.app") + eng.addHandler(h); eng.setLevel(logging.INFO); eng.propagate = False + app_lg.addHandler(h); app_lg.setLevel(logging.INFO); app_lg.propagate = False + + +def get_engine_logger() -> logging.Logger: + logger = logging.getLogger("sneakyscope.engine") + logger.propagate = False + if logger.level == logging.NOTSET: + logger.setLevel(logging.INFO) + return logger + + +def get_app_logger() -> logging.Logger: + logger = logging.getLogger("sneakyscope.app") + logger.propagate = False + if logger.level == logging.NOTSET: + logger.setLevel(logging.INFO) + return logger diff --git a/app/rules/factory.py b/app/rules/factory.py new file mode 100644 index 0000000..688b2c0 --- /dev/null +++ b/app/rules/factory.py @@ -0,0 +1,51 @@ +# app/rules/factory.py +from pathlib import Path + +from app.logging_setup import get_engine_logger +from app.rules.rules_engine import RuleEngine +from app.rules.rules_engine import Rule +from app.rules.function_rules import FunctionRuleAdapter +from app.rules.function_rules import ( + form_action_missing, form_http_on_https_page, form_submits_to_different_host, + script_src_uses_data_or_blob, script_src_has_dangerous_extension, script_third_party_host, +) + +from app.rules.rules_engine import load_rules_from_yaml + +base_dir = Path(__file__).resolve().parent.parent +RULES_FILE_PATH = base_dir / "config" / "suspicious_rules.yaml" + +log = get_engine_logger() + +def build_rules_engine() -> RuleEngine: + eng = RuleEngine() + + # 1) YAML rules + yaml_rules = load_rules_from_yaml(RULES_FILE_PATH) + for r in yaml_rules: + eng.add_rule(r) + log.info("Found %d suspicious rules from %s", + len(yaml_rules), getattr(yaml_rules, "source_path", "config")) + + # 2) Function rules + from app.rules.function_rules import FactAdapter + adapter = FactAdapter() + + def add(rule: Rule): + eng.add_rule(rule) + + add(Rule("form_action_missing", "Form has no action attribute", "form", "function", + FunctionRuleAdapter(form_action_missing, category="form", adapter=adapter, rule_name="form_action_missing"))) + add(Rule("form_http_on_https_page", "Form submits via HTTP from HTTPS page", "form", "function", + FunctionRuleAdapter(form_http_on_https_page, category="form", adapter=adapter, rule_name="form_http_on_https_page"))) + add(Rule("form_submits_to_different_host", "Form submits to a different host", "form", "function", + FunctionRuleAdapter(form_submits_to_different_host, category="form", adapter=adapter, rule_name="form_submits_to_different_host"))) + add(Rule("script_src_uses_data_or_blob", "Script src uses data:/blob: URL", "script", "function", + FunctionRuleAdapter(script_src_uses_data_or_blob, category="script", adapter=adapter, rule_name="script_src_uses_data_or_blob"))) + add(Rule("script_src_has_dangerous_extension", "External script with dangerous extension", "script", "function", + FunctionRuleAdapter(script_src_has_dangerous_extension, category="script", adapter=adapter, rule_name="script_src_has_dangerous_extension"))) + add(Rule("script_third_party_host", "Script is from a third-party host", "script", "function", + FunctionRuleAdapter(script_third_party_host, category="script", adapter=adapter, rule_name="script_third_party_host"))) + + log.info("Registered %d total rules (YAML + function)", len(eng.rules)) + return eng diff --git a/app/rules/function_rules.py b/app/rules/function_rules.py index 4e6baf8..2c91a91 100644 --- a/app/rules/function_rules.py +++ b/app/rules/function_rules.py @@ -19,10 +19,16 @@ Note: from __future__ import annotations -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional, Tuple +import inspect +import logging from urllib.parse import urlparse -_NOOP_ACTIONS = {"", "#", "javascript:void(0)", "javascript:void(0);"} +from app.logging_setup import get_app_logger + +app_logger = get_app_logger() + +_NOOP_ACTIONS = {"", "#", "javascript:void(0)", "javascript:", "about:blank"} # --------------------------------------------------------------------------- # Adapters @@ -36,9 +42,6 @@ class FactAdapter: You can expand the per-category parsers over time as needed. """ - def __init__(self, logger: Optional[Any] = None) -> None: - self.logger = logger - def adapt(self, text_or_facts: Any, category: str = "") -> Dict[str, Any]: """ Adapt text_or_facts (str or dict) into a facts dict. @@ -65,13 +68,11 @@ class FactAdapter: elif category == "text": return {"category": "text", "raw": text_or_facts} else: - if self.logger: - self.logger.warning(f"[FactAdapter] Unknown category '{category}', returning raw snippet.") + app_logger.warning(f"[FactAdapter] Unknown category '{category}', returning raw snippet.") return {"category": category, "raw": text_or_facts} # Fallback for unrecognized input types - if self.logger: - self.logger.warning(f"[FactAdapter] Unsupported input type: {type(text_or_facts)!r}") + app_logger.warning(f"[FactAdapter] Unsupported input type: {type(text_or_facts)!r}") return {"category": category, "raw": text_or_facts} # ---- Per-category parsers ---- @@ -109,23 +110,149 @@ class FactAdapter: class FunctionRuleAdapter: """ - Callable wrapper that adapts engine input (str or dict) into 'facts' and then - invokes the underlying function rule that expects a facts dict. + Wraps a function-based rule so it ALWAYS returns: + - match: (True, Dict[str, Any]) + - no match: (False, None) - Usage: - wrapped = FunctionRuleAdapter(fn=form_action_missing, category="form", adapter=FactAdapter(app.logger)) - matched, reason = wrapped("action=https://...") # engine-friendly + Also adapts non-dict inputs into facts via a provided 'adapter' using a + duck-typed protocol, so callers can pass raw items (e.g., strings/nodes). """ - def __init__(self, fn, category: str = "", adapter: Optional[FactAdapter] = None) -> None: + def __init__( + self, + fn: Callable[[Dict[str, Any]], Any], + category: str, + adapter: Optional[Any] = None, + rule_name: Optional[str] = None, + logger: Optional[logging.Logger] = None, + ): self.fn = fn self.category = category - self.adapter = adapter or FactAdapter() + self.adapter = adapter + self.rule_name = rule_name or getattr(fn, "__name__", "") + - def __call__(self, text_or_facts: Any): - facts = self.adapter.adapt(text_or_facts, category=self.category) - return self.fn(facts) + # ---------- helpers ---------- + def _adapt_to_facts(self, raw: Any) -> Optional[Dict[str, Any]]: + """ + Convert whatever the engine passed into a facts dict. + Tries the provided adapter using a duck-typed protocol. + Returns a dict, or None if we can't adapt. + """ + # Already a dict? Use it. + if isinstance(raw, dict): + return raw + + # Try adapter if provided + if self.adapter is not None: + # Preferred generic signatures + for meth in ("build_facts", "facts", "to_facts"): + fn = getattr(self.adapter, meth, None) + if callable(fn): + try: + facts = fn(self.category, raw) + if isinstance(facts, dict): + return facts + except Exception as exc: + app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc) + + # Category-specific fallbacks: build__facts / _facts + cands = (f"build_{self.category}_facts", f"{self.category}_facts") + for meth in cands: + fn = getattr(self.adapter, meth, None) + if callable(fn): + try: + facts = fn(raw) + if isinstance(facts, dict): + return facts + except Exception as exc: + app_logger.exception("[Rule] '%s' adapter.%s failed: %s", self.rule_name, meth, exc) + + # No way to adapt + return None + + def _coerce_return(self, outcome: Any) -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + Normalize rule function returns: + + accepted: + (bool, dict|None) + (bool, str) -> dict {'note': str} on match + (bool,) or bool -> (bool, None) + + On invalid shapes, treat as no-match. + """ + # Exact 2-tuple + if isinstance(outcome, tuple) and len(outcome) == 2: + matched = bool(outcome[0]) + raw = outcome[1] + + if not matched: + return False, None + + if raw is None: + return True, {} # match with empty payload is fine + if isinstance(raw, dict): + return True, raw + if isinstance(raw, str): + return True, {"note": raw} + + app_logger.warning("[Rule] '%s' returned payload of invalid type: %s", + self.rule_name, type(raw).__name__) + # Still treat as match but give minimal payload + return True, {"note": "coerced-invalid-payload", "value_repr": repr(raw)} + + # Legacy: (bool,) or bare bool + if isinstance(outcome, tuple) and len(outcome) == 1 and isinstance(outcome[0], bool): + return (True, {}) if outcome[0] else (False, None) + if isinstance(outcome, bool): + return (True, {}) if outcome else (False, None) + + # Junk -> no match + app_logger.warning("[Rule] '%s' returned invalid shape: %s", + self.rule_name, type(outcome).__name__) + return False, None + + # ---------- callable ---------- + + def __call__(self, raw: Any) -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + Apply the wrapped rule to the provided item (raw or facts). + Returns: + (True, dict) on match + (False, None) on no match + """ + facts = self._adapt_to_facts(raw) + if facts is None: + app_logger.warning("[Rule] '%s' received non-dict facts (%s). Coercing to miss.", + self.rule_name, type(raw).__name__) + return False, None + + try: + outcome = self.fn(facts) + except Exception as exc: + app_logger.exception("[Rule] '%s' raised: %s", self.rule_name, exc) + return False, None + + matched, payload = self._coerce_return(outcome) + return matched, payload + + +def _hit(payload: Optional[Dict[str, Any]] = None) -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + Standardize a positive match result: (True, dict) + """ + if payload is None: + payload = {} + return True, payload + + +def _miss() -> Tuple[bool, Optional[Dict[str, Any]]]: + """ + Standardize a negative match result: (False, None) + """ + return False, None # --------------------------------------------------------------------------- # Function-based rules (dict 'facts' expected) @@ -133,20 +260,25 @@ class FunctionRuleAdapter: # ---------------- Script rules ---------------- -def script_src_uses_data_or_blob(facts: Dict[str, Any]): +def script_src_uses_data_or_blob(facts: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]: """Flags + + + {% endblock %} \ No newline at end of file diff --git a/app/utils/browser.py b/app/utils/browser.py index e79862d..ea06b0f 100644 --- a/app/utils/browser.py +++ b/app/utils/browser.py @@ -35,10 +35,12 @@ from playwright.async_api import async_playwright, TimeoutError as PWTimeoutErro from app.utils.io_helpers import safe_write from app.utils.enrichment import enrich_url from app.utils.settings import get_settings +from app.logging_setup import get_app_logger # Load settings once for constants / defaults settings = get_settings() +logger = get_app_logger() class Browser: """ @@ -280,7 +282,7 @@ class Browser: except Exception as rule_exc: # Be defensive—bad rule shouldn't break the form pass try: - self.logger.debug("Form rule error", extra={"rule": getattr(r, "name", "?"), "error": str(rule_exc)}) + logger.debug("Form rule error", extra={"rule": getattr(r, "name", "?"), "error": str(rule_exc)}) except Exception: pass continue @@ -298,7 +300,7 @@ class Browser: except Exception as exc: # Keep analysis resilient try: - self.logger.error("Form analysis error", extra={"error": str(exc)}) + logger.error("Form analysis error", extra={"error": str(exc)}) except Exception: pass results.append({ @@ -390,7 +392,7 @@ class Browser: # ----------------------------------------------------------------------- # Fetcher / Orchestrator # ----------------------------------------------------------------------- - async def fetch_page_artifacts(self, url: str) -> Dict[str, Any]: + async def fetch_page_artifacts(self, url: str, fetch_ssl_enabled:bool=False) -> Dict[str, Any]: """ Fetch page artifacts and save them in a UUID-based directory for this Browser's storage_dir. @@ -476,7 +478,7 @@ class Browser: suspicious_scripts = self.analyze_scripts(html_content, base_url=final_url) # Enrichment - enrichment = enrich_url(url) + enrichment = enrich_url(url, fetch_ssl_enabled) # Global PASS/FAIL table per category (entire document) rule_checks_overview = self.build_rule_checks_overview(html_content) @@ -505,7 +507,7 @@ class Browser: safe_write(results_path, json.dumps(result, indent=2, ensure_ascii=False)) try: - current_app.logger.info(f"[browser] Saved results.json for run {run_uuid}") + logger.info(f"Saved results.json for run {run_uuid}") except Exception: pass diff --git a/app/utils/enrichment.py b/app/utils/enrichment.py index dea1378..70b0f4d 100644 --- a/app/utils/enrichment.py +++ b/app/utils/enrichment.py @@ -1,19 +1,25 @@ -import logging -from pathlib import Path from urllib.parse import urlparse import requests -import yaml +import json import whois from datetime import datetime from ipaddress import ip_address import socket +# Optional: high-accuracy root-domain detection if available (tldextract is in the requirements, but this is still useful) +try: + import tldextract + _HAS_TLDEXTRACT = True +except Exception: + _HAS_TLDEXTRACT = False + # Local imports from app.utils.cache_db import get_cache from app.utils.settings import get_settings +from app.utils.tls_probe import TLSEnumerator -# Configure logging -logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") +# Configure logger +from app.logging_setup import get_app_logger # Init cache cache = get_cache("/data/cache.db") @@ -25,32 +31,244 @@ days = 24 * 60 GEOIP_DEFAULT_TTL = settings.cache.geoip_cache_days * days WHOIS_DEFAULT_TTL = settings.cache.whois_cache_days * days -def enrich_url(url: str) -> dict: - """Perform WHOIS, GeoIP, and BEC word enrichment.""" - result = {} +logger = get_app_logger() + + + +def parse_target_to_host(target): + """ + Convert a user-supplied string (URL or domain) into a hostname. + + Returns: + str or None + """ + if target is None: + return None + + value = str(target).strip() + if value == "": + return None + + # urlparse needs a scheme to treat the first token as netloc + parsed = urlparse(value if "://" in value else f"http://{value}") + + # If the input was something like "localhost:8080/path", netloc includes the port + host = parsed.hostname + if host is None: + return None + + # Lowercase for consistency + host = host.strip().lower() + if host == "": + return None + + return host + +def get_root_domain(hostname): + """ + Determine the registrable/root domain from a hostname. + Prefers tldextract if available; otherwise falls back to a heuristic. + + Examples: + sub.a.example.com -> example.com + portal.gov.uk -> gov.uk (but with PSL, you’d get portal.gov.uk’s registrable, which is gov.uk) + api.example.co.uk -> example.co.uk (PSL needed for correctness) + + Returns: + str (best-effort registrable domain) + """ + if hostname is None: + return None + + if _HAS_TLDEXTRACT: + # tldextract returns subdomain, domain, suffix separately using PSL rules + # e.g., sub= "api", domain="example", suffix="co.uk" + parts = tldextract.extract(hostname) + # If suffix is empty (e.g., localhost), fall back + if parts.suffix: + return f"{parts.domain}.{parts.suffix}".lower() + else: + return hostname.lower() + + # Fallback heuristic: last two labels (not perfect for multi-part TLDs, but safe) + # We avoid list comprehensions per your preference for explicit code + labels = hostname.split(".") + labels = [lbl for lbl in labels if lbl] # allow simple cleanup without logic change + + if len(labels) >= 2: + last = labels[-1] + second_last = labels[-2] + candidate = f"{second_last}.{last}".lower() + return candidate + + return hostname.lower() + +def is_root_domain(hostname): + """ + Is the provided hostname the same as its registrable/root domain? + """ + if hostname is None: + return False + + root = get_root_domain(hostname) + if root is None: + return False + + return hostname.lower() == root.lower() + +def search_certs(domain, wildcard=True, expired=True, deduplicate=True): + """ + Search crt.sh for the given domain. + + domain -- Domain to search for + wildcard -- Whether or not to prepend a wildcard to the domain + (default: True) + expired -- Whether or not to include expired certificates + (default: True) + + Return a list of objects, like so: + + { + "issuer_ca_id": 16418, + "issuer_name": "C=US, O=Let's Encrypt, CN=Let's Encrypt Authority X3", + "name_value": "hatch.uber.com", + "min_cert_id": 325717795, + "min_entry_timestamp": "2018-02-08T16:47:39.089", + "not_before": "2018-02-08T15:47:39" + } + """ + base_url = "https://crt.sh/?q={}&output=json" + if not expired: + base_url = base_url + "&exclude=expired" + if deduplicate: + base_url = base_url + "&deduplicate=Y" + if wildcard and "%" not in domain: + domain = "%.{}".format(domain) + url = base_url.format(domain) + + ua = 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1' + req = requests.get(url, headers={'User-Agent': ua}) + + if req.ok: + try: + content = req.content.decode('utf-8') + data = json.loads(content) + return data + except ValueError: + # crt.sh fixed their JSON response. This shouldn't be necessary anymore + # https://github.com/crtsh/certwatch_db/commit/f4f46ea37c23543c4cdf1a3c8867d68967641807 + data = json.loads("[{}]".format(content.replace('}{', '},{'))) + return data + except Exception as err: + logger.error("Error retrieving cert information from CRT.sh.") + return None + +def gather_crtsh_certs_for_target(target): + """ + Given a URL or domain-like input, return crt.sh results for: + - The exact hostname + - If hostname is a subdomain, also the wildcard for the root domain (e.g., *.example.com) + + We intentionally run this even if the scheme is HTTP (per your design). + Expired certs are excluded by default. + + Returns: + dict: + { + "input": , + "hostname": , + "root_domain": , + "is_root_domain": , + "crtsh": { + "host_certs": [... or None], + "wildcard_root_certs": [... or None] + } + } + """ + result = { + "input": target, + "hostname": None, + "root_domain": None, + "is_root_domain": False, + "crtsh": { + "host_certs": None, + "wildcard_root_certs": None + } + } + + try: + hostname = parse_target_to_host(target) + result["hostname"] = hostname + + if hostname is None: + return result + + root = get_root_domain(hostname) + result["root_domain"] = root + result["is_root_domain"] = is_root_domain(hostname) + + # Always query crt.sh for the specific hostname + # (expired=False means we filter expired) + host_certs = search_certs(hostname, wildcard=False, expired=False) + result["crtsh"]["host_certs"] = host_certs + + # If subdomain, also look up wildcard for the root domain: *.root + if not result["is_root_domain"] and root: + wildcard_certs = search_certs(root, wildcard=True, expired=False) + result["crtsh"]["wildcard_root_certs"] = wildcard_certs + + except Exception as exc: + logger.exception("crt.sh enrichment failed: %s", exc) + + return result + +def enrich_url(url: str, fetch_ssl_enabled:bool=False) -> dict: + """Perform WHOIS, GeoIP""" + enrichment = {} # Extract hostname parsed = urlparse(url) hostname = parsed.hostname or url # fallback if parsing fails # --- WHOIS --- - result.update(enrich_whois(hostname)) + enrichment.update(enrich_whois(hostname)) # --- GeoIP --- - result["geoip"] = enrich_geoip(hostname) + enrichment["geoip"] = enrich_geoip(hostname) - return result + # === SSL/TLS: crt.sh + live probe === + # if fetching ssl... + if fetch_ssl_enabled: + try: + # 1) Certificate Transparency (already implemented previously) + crtsh_info = gather_crtsh_certs_for_target(url) + # 2) Live TLS probe (versions + negotiated cipher per version) + tls_enum = TLSEnumerator(timeout_seconds=5.0) + probe_result = tls_enum.probe(url) + + enrichment["ssl_tls"] = {} + enrichment["ssl_tls"]["crtsh"] = crtsh_info + enrichment["ssl_tls"]["probe"] = probe_result.to_dict() + + except Exception as exc: + logger.exception("SSL/TLS enrichment failed: %s", exc) + enrichment["ssl_tls"] = {"error": "SSL/TLS enrichment failed"} + else: + # Include a small marker so the UI can show “skipped” + enrichment["ssl_tls"] = {"skipped": True, "reason": "Disabled on submission"} + + return enrichment def enrich_whois(hostname: str) -> dict: """Fetch WHOIS info using python-whois with safe type handling.""" cache_key = f"whois:{hostname}" cached = cache.read(cache_key) if cached: - logging.info(f"[CACHE HIT] for WHOIS: {hostname}") + logger.info(f"[CACHE HIT] for WHOIS: {hostname}") return cached - logging.info(f"[CACHE MISS] for WHOIS: {hostname}") + logger.info(f"[CACHE MISS] for WHOIS: {hostname}") result = {} try: w = whois.whois(hostname) @@ -73,7 +291,7 @@ def enrich_whois(hostname: str) -> dict: } except Exception as e: - logging.warning(f"WHOIS lookup failed for {hostname}: {e}") + logger.warning(f"WHOIS lookup failed for {hostname}: {e}") try: # fallback raw whois text import subprocess @@ -81,14 +299,13 @@ def enrich_whois(hostname: str) -> dict: result["whois"] = {} result["raw_whois"] = raw_output except Exception as raw_e: - logging.error(f"Raw WHOIS also failed: {raw_e}") + logger.error(f"Raw WHOIS also failed: {raw_e}") result["whois"] = {} result["raw_whois"] = "N/A" cache.create(cache_key, result, WHOIS_DEFAULT_TTL) return result - def enrich_geoip(hostname: str) -> dict: """Resolve hostname to IPs and fetch info from ip-api.com.""" geo_info = {} @@ -98,11 +315,11 @@ def enrich_geoip(hostname: str) -> dict: cache_key = f"geoip:{ip_str}" cached = cache.read(cache_key) if cached: - logging.info(f"[CACHE HIT] for GEOIP: {ip}") + logger.info(f"[CACHE HIT] for GEOIP: {ip}") geo_info[ip_str] = cached continue - logging.info(f"[CACHE MISS] for GEOIP: {ip}") + logger.info(f"[CACHE MISS] for GEOIP: {ip}") try: resp = requests.get(f"http://ip-api.com/json/{ip_str}?fields=24313855", timeout=5) if resp.status_code == 200: @@ -116,7 +333,6 @@ def enrich_geoip(hostname: str) -> dict: return geo_info - def extract_ips_from_url(hostname: str): """Resolve hostname to IPs.""" try: diff --git a/app/utils/io_helpers.py b/app/utils/io_helpers.py index 0243be3..d5a39fc 100644 --- a/app/utils/io_helpers.py +++ b/app/utils/io_helpers.py @@ -1,9 +1,10 @@ import json -import logging from pathlib import Path from datetime import datetime -logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") +from app.logging_setup import get_app_logger + +logger = get_app_logger() def safe_write(path: Path | str, content: str, mode="w", encoding="utf-8"): """Write content to a file safely with logging.""" @@ -12,9 +13,9 @@ def safe_write(path: Path | str, content: str, mode="w", encoding="utf-8"): path.parent.mkdir(parents=True, exist_ok=True) with open(path, mode, encoding=encoding) as f: f.write(content) - logging.info(f"[+] Wrote file: {path}") + logger.info(f"[+] Wrote file: {path}") except Exception as e: - logging.error(f"[!] Failed writing {path}: {e}") + logger.error(f"[!] Failed writing {path}: {e}") raise def get_recent_results(storage_dir: Path, limit: int, logger) -> list[dict]: diff --git a/app/utils/settings.py b/app/utils/settings.py index 0b28e70..593b98d 100644 --- a/app/utils/settings.py +++ b/app/utils/settings.py @@ -63,6 +63,7 @@ class AppConfig: name: str = "MyApp" version_major: int = 1 version_minor: int = 0 + print_rule_loads: bool = False @dataclass diff --git a/app/utils/tls_probe.py b/app/utils/tls_probe.py new file mode 100644 index 0000000..5c750fb --- /dev/null +++ b/app/utils/tls_probe.py @@ -0,0 +1,270 @@ +import socket +import ssl +import time +import logging +from urllib.parse import urlparse + + +class TLSProbeResult: + """ + Container for the results of a TLS probe across protocol versions. + """ + + def __init__(self): + self.hostname = None + self.port = 443 + self.results_by_version = {} # e.g., {"TLS1.2": {"supported": True, "cipher": "TLS_AES_128_GCM_SHA256", ...}} + self.weak_protocols = [] # e.g., ["TLS1.0", "TLS1.1"] + self.weak_ciphers = [] # e.g., ["RC4-SHA"] + self.errors = [] # textual errors encountered during probing + + def to_dict(self): + """ + Convert the object to a serializable dictionary. + """ + output = { + "hostname": self.hostname, + "port": self.port, + "results_by_version": self.results_by_version, + "weak_protocols": self.weak_protocols, + "weak_ciphers": self.weak_ciphers, + "errors": self.errors + } + return output + + +class TLSEnumerator: + """ + Enumerate supported TLS versions for a server by attempting handshakes with constrained contexts. + Also collects the server-selected cipher for each successful handshake. + + Notes: + - We do NOT validate certificates; this is posture discovery, not trust verification. + - Cipher enumeration is limited to "what was negotiated with default cipher list" per version. + Deep cipher scanning (per-cipher attempts) can be added later if needed. + """ + + def __init__(self, timeout_seconds=5.0): + self.timeout_seconds = float(timeout_seconds) + + def _build_context_for_version(self, tls_version_label): + """ + Build an SSLContext that only allows the specified TLS version. + """ + # Base client context + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + + # Disable certificate checks so we can probe misconfigured/self-signed endpoints + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + # Constrain to a single protocol version using minimum/maximum + # Map label -> ssl.TLSVersion + if tls_version_label == "TLS1.0" and hasattr(ssl.TLSVersion, "TLSv1"): + context.minimum_version = ssl.TLSVersion.TLSv1 + context.maximum_version = ssl.TLSVersion.TLSv1 + elif tls_version_label == "TLS1.1" and hasattr(ssl.TLSVersion, "TLSv1_1"): + context.minimum_version = ssl.TLSVersion.TLSv1_1 + context.maximum_version = ssl.TLSVersion.TLSv1_1 + elif tls_version_label == "TLS1.2" and hasattr(ssl.TLSVersion, "TLSv1_2"): + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_2 + elif tls_version_label == "TLS1.3" and hasattr(ssl.TLSVersion, "TLSv1_3"): + context.minimum_version = ssl.TLSVersion.TLSv1_3 + context.maximum_version = ssl.TLSVersion.TLSv1_3 + else: + # Version not supported by this Python/OpenSSL build + return None + + # Keep default cipher list; we only want to see what is negotiated + # You can later set context.set_ciphers("...") for deeper scans. + return context + + def _attempt_handshake(self, hostname, port, context): + """ + Attempt a TLS handshake to (hostname, port) using the given context. + Returns a tuple: (supported(bool), selected_cipher(str or None), elapsed_seconds(float or None), error(str or None)) + """ + supported = False + selected_cipher = None + elapsed = None + error_text = None + + # Create a TCP connection with a timeout + sock = None + ssock = None + start = None + try: + # Resolve and connect + # Note: create_connection will handle IPv4/IPv6 resolution + sock = socket.create_connection((hostname, port), timeout=self.timeout_seconds) + + # Start timer right before TLS wrap to capture handshake duration mainly + start = time.time() + + # SNI is important: pass server_hostname + ssock = context.wrap_socket(sock, server_hostname=hostname) + + # Access negotiated cipher; returns (cipher_name, protocol, secret_bits) + cipher_info = ssock.cipher() + if cipher_info is not None and len(cipher_info) >= 1: + selected_cipher = str(cipher_info[0]) + + supported = True + elapsed = time.time() - start + + except Exception as exc: + # Capture the error for diagnostics + error_text = f"{type(exc).__name__}: {str(exc)}" + elapsed = None + finally: + # Clean up sockets + try: + if ssock is not None: + ssock.close() + except Exception: + pass + try: + if sock is not None: + sock.close() + except Exception: + pass + + return supported, selected_cipher, elapsed, error_text + + def probe(self, target): + """ + Probe the target (URL or hostname or 'hostname:port') for TLS 1.0/1.1/1.2/1.3 support. + Returns TLSProbeResult. + """ + result = TLSProbeResult() + host, port = self._parse_target_to_host_port(target) + result.hostname = host + result.port = port + + if host is None: + result.errors.append("Unable to parse a hostname from the target.") + return result + + # Define the versions we will test, in ascending order + versions_to_test = ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"] + + # Iterate explicitly to match your coding style preference + for version_label in versions_to_test: + context = self._build_context_for_version(version_label) + + # If this Python/OpenSSL cannot restrict to this version, mark as unsupported_by_runtime + if context is None: + version_outcome = { + "supported": False, + "selected_cipher": None, + "handshake_seconds": None, + "error": "Version not supported by local runtime" + } + result.results_by_version[version_label] = version_outcome + continue + + supported, cipher, elapsed, err = self._attempt_handshake(host, port, context) + + version_outcome = { + "supported": supported, + "selected_cipher": cipher, + "handshake_seconds": elapsed, + "error": err + } + result.results_by_version[version_label] = version_outcome + + # Determine weak protocols (if the handshake succeeded on legacy versions) + # RFC 8996 and industry guidance deprecate TLS 1.0 and 1.1. + try: + v10 = result.results_by_version.get("TLS1.0") + if v10 is not None and v10.get("supported") is True: + result.weak_protocols.append("TLS1.0") + except Exception: + pass + + try: + v11 = result.results_by_version.get("TLS1.1") + if v11 is not None and v11.get("supported") is True: + result.weak_protocols.append("TLS1.1") + except Exception: + pass + + # Flag weak ciphers encountered in any successful negotiation + # This is a heuristic: we only see the single chosen cipher per version. + try: + for label in ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"]: + outcome = result.results_by_version.get(label) + if outcome is None: + continue + if outcome.get("supported") is not True: + continue + + cipher_name = outcome.get("selected_cipher") + if cipher_name is None: + continue + + # Simple string-based checks for known-weak families + # (RC4, 3DES, NULL, EXPORT, MD5). Expand as needed. + name_upper = str(cipher_name).upper() + is_weak = False + + if "RC4" in name_upper: + is_weak = True + elif "3DES" in name_upper or "DES-CBC3" in name_upper: + is_weak = True + elif "NULL" in name_upper: + is_weak = True + elif "EXPORT" in name_upper or "EXP-" in name_upper: + is_weak = True + elif "-MD5" in name_upper: + is_weak = True + + if is_weak: + # Avoid duplicates + if cipher_name not in result.weak_ciphers: + result.weak_ciphers.append(cipher_name) + except Exception as exc: + result.errors.append(f"Cipher analysis error: {exc}") + + return result + + def _parse_target_to_host_port(self, target): + """ + Accepts URL, hostname, or 'hostname:port' and returns (hostname, port). + Defaults to port 443 if not specified. + """ + if target is None: + return None, 443 + + text = str(target).strip() + if text == "": + return None, 443 + + # If it's clearly a URL, parse it normally + if "://" in text: + parsed = urlparse(text) + hostname = parsed.hostname + port = parsed.port + if hostname is None: + return None, 443 + if port is None: + port = 443 + return hostname.lower(), int(port) + + # If it's host:port, split safely + # Note: URLs without scheme can be tricky (IPv6), but we'll handle [::1]:443 form later if needed + if ":" in text and text.count(":") == 1: + host_part, port_part = text.split(":") + host_part = host_part.strip() + port_part = port_part.strip() + if host_part == "": + return None, 443 + try: + port_value = int(port_part) + except Exception: + port_value = 443 + return host_part.lower(), int(port_value) + + # Otherwise treat it as a bare hostname + return text.lower(), 443 diff --git a/app/utils/url_tools.py b/app/utils/url_tools.py new file mode 100644 index 0000000..92805a2 --- /dev/null +++ b/app/utils/url_tools.py @@ -0,0 +1,133 @@ +# app/utils/urltools.py +from urllib.parse import urlparse, urlunparse +import requests +import idna + +# Reuse existing decorator (import from wherever you defined it) + +from app.utils.settings import singleton_loader + + +class URLNormalizer: + """ + Normalize user input into a fully-qualified URL for analysis. + + Behavior: + - If no scheme is present, prepend https:// by default. + - Optional quick HTTPS reachability check with fallback to http://. + - Converts Unicode hostnames to punycode via IDNA. + + Notes: + - Keep the first-constructed configuration stable via the singleton factory. + - Avoids Flask/current_app/threading per your project style. + """ + + def __init__(self, prefer_https: bool = True, fallback_http: bool = False, connect_timeout: float = 2.0): + self.prefer_https = bool(prefer_https) + self.fallback_http = bool(fallback_http) + self.connect_timeout = float(connect_timeout) + + def normalize_for_analysis(self, raw_input: str) -> str: + """ + Convert raw input (URL or domain) into a normalized URL string. + + Raises: + ValueError: if input is empty/invalid. + """ + if raw_input is None: + raise ValueError("Empty input") + + text = str(raw_input).strip() + if text == "": + raise ValueError("Empty input") + + # Repair common typos (missing colon) + lower = text.lower() + if lower.startswith("http//"): + text = "http://" + text[6:] + elif lower.startswith("https//"): + text = "https://" + text[7:] + + # Respect an existing scheme + if "://" in text: + parsed = urlparse(text) + return self._recompose_with_punycode_host(parsed) + + # No scheme -> build one + if self.prefer_https: + https_url = "https://" + text + if self.fallback_http: + if self._quick_https_ok(https_url): + return self._recompose_with_punycode_host(urlparse(https_url)) + http_url = "http://" + text + return self._recompose_with_punycode_host(urlparse(http_url)) + return self._recompose_with_punycode_host(urlparse(https_url)) + + http_url = "http://" + text + return self._recompose_with_punycode_host(urlparse(http_url)) + + def _recompose_with_punycode_host(self, parsed): + """ + Recompose a parsed URL with hostname encoded to ASCII (punycode). + Preserves userinfo, port, path, params, query, fragment. + """ + host = parsed.hostname + if host is None: + return urlunparse(parsed) + + try: + ascii_host = idna.encode(host).decode("ascii") + except Exception: + ascii_host = host + + # rebuild netloc (auth + port) + netloc = ascii_host + if parsed.port: + netloc = f"{netloc}:{parsed.port}" + if parsed.username: + if parsed.password: + netloc = f"{parsed.username}:{parsed.password}@{netloc}" + else: + netloc = f"{parsed.username}@{netloc}" + + return urlunparse(( + parsed.scheme, + netloc, + parsed.path or "", + parsed.params or "", + parsed.query or "", + parsed.fragment or "", + )) + + def _quick_https_ok(self, https_url: str) -> bool: + """ + Quick reachability check for https:// using a HEAD request. + Redirects allowed; TLS verify disabled — posture-only. + """ + try: + resp = requests.head(https_url, allow_redirects=True, timeout=self.connect_timeout, verify=False) + _ = resp.status_code + return True + except Exception: + return False + + +# ---- Singleton factory using our decorator ---- +@singleton_loader +def get_url_normalizer( + prefer_https: bool = True, + fallback_http: bool = False, + connect_timeout: float = 2.0, +) -> URLNormalizer: + """ + Return the singleton URLNormalizer instance. + + IMPORTANT: With this decorator, the FIRST call's arguments "win". + Later calls return the cached instance and ignore new arguments. + + """ + return URLNormalizer( + prefer_https=prefer_https, + fallback_http=fallback_http, + connect_timeout=connect_timeout, + ) diff --git a/app/wsgi.py b/app/wsgi.py index 745c665..35f38ba 100644 --- a/app/wsgi.py +++ b/app/wsgi.py @@ -8,3 +8,14 @@ from . import create_app # Gunicorn will look for "app" app = create_app() + +from app.state import set_rules_engine, get_rules_engine +from app.logging_setup import get_app_logger +from app.rules.factory import build_rules_engine + +# Preload path: build once, set into global state +_engine = build_rules_engine() +set_rules_engine(_engine) + +logger = get_app_logger() +logger.info("[wsgi] engine id=%s total=%d", hex(id(_engine)), len(_engine.rules)) diff --git a/entrypoint.sh b/entrypoint.sh index 9113bf9..5c2970f 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -19,4 +19,5 @@ exec gunicorn \ --worker-class gthread \ --timeout 300 \ --graceful-timeout 300 \ + --preload \ "app.wsgi:app" diff --git a/requirements.txt b/requirements.txt index 10ee22a..ff39185 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,16 @@ -Flask>=3.0.3 -Jinja2>=3.1.4 -Werkzeug>=3.0.3 -itsdangerous>=2.2.0 -click>=8.1.7 -lxml>=5.3.0 +Flask>=3.0.3 # flask +Jinja2>=3.1.4 # flask dependancy +Werkzeug>=3.0.3 # flask dependancy +itsdangerous>=2.2.0 # flask dependancy +click>=8.1.7 # flask dependancy +idna # flask dependancy +gunicorn>=22.0.0 # Flask Production server +lxml>=5.3.0 # xml parsing playwright==1.45.0 # Playwright stack beautifulsoup4>=4.12.3 # HTML parsing, etc. -gunicorn>=22.0.0 # Production server python-whois # For WHOIS lookups -geoip2 # MaxMind GeoLite2 database for IP geolocation +# geoip2 # MaxMind GeoLite2 database for IP geolocation dnspython # For DNS lookups, including A/AAAA records -ipwhois -PyYAML \ No newline at end of file +ipwhois # IP parsing +PyYAML # ability to load yaml files +tldextract # for tls extraction to find root domain things \ No newline at end of file