Files
Phillip Tarrant 693f7d67b9 feat: HTTPS auto-normalization; robust TLS intel UI; global rules state; clean logging; preload
- Add SSL/TLS intelligence pipeline:
  - crt.sh lookup with expired-filtering and root-domain wildcard resolution
  - live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data

- Introduce `URLNormalizer` with punycode support and typo repair
  - Auto-prepend `https://` for bare domains (e.g., `google.com`)
  - Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
  - `get_url_normalizer()` reads defaults from Settings (if present)

- Standardize function-rule return shape to `(bool, dict|None)` across
  `form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
  - Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
  - Adapt non-dict inputs to facts (category-aware and via provided adapter)
  - Return `(True, dict)` on match, `(False, None)` on miss
  - Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
  - Back rules by private `self._rules`; `rules` property returns copy
  - Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
  - Fix AttributeError from property assignment during `__init__`

- Replace hidden singleton factory with explicit builder + global state:
  - `app/rules/factory.py::build_rules_engine()` builds and logs totals
  - `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
  - `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts

- Unify logging wiring:
  - `wire_logging_once(app)` clears and attaches a single handler chain
  - Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
  - Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)

- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)

- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section

- Consumers should import the rules engine via:
  - `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
  then publish with `set_rules_engine()`. Do not call old singleton factories.

- New/changed modules (high level):
  - `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
  - `app/rules/function_rules.py` (±) — normalized payload returns
  - `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
  - `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
  - `app/rules/factory.py` (±) — pure builder; totals logged post-registration
  - `app/state.py` (+) — process-global rules engine
  - `app/logging_setup.py` (±) — single chain, two named loggers
  - `app/wsgi.py` (±) — preload build + `set_rules_engine()`
  - `entrypoint.sh` (±) — add `--preload`
  - templates (±) — TLS card, raw toggle; front-page checkbox

Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
2025-08-21 22:05:16 -05:00

214 lines
6.9 KiB
Python

# app/blueprints/api.py
"""
API blueprint for JSON endpoints.
Endpoints:
POST /api/analyze_script
Body:
{
"job_id": "<uuid>", # or "uuid": "<uuid>"
"url": "https://cdn.example.com/app.js",
"category": "script" # optional, defaults to "script"
}
Response:
{
"ok": true,
"final_url": "...",
"status_code": 200,
"bytes": 12345,
"truncated": false,
"sha256": "...",
"artifact_path": "/abs/path/to/<uuid>/scripts/fetched/<index>.js",
"findings": [ { "name": "...", "description": "...", "severity": "...", "tags": [...], "reason": "..." }, ... ],
"snippet": "<trimmed content>",
"snippet_len": 45678
}
"""
import os
import time
from flask import Blueprint, request, jsonify, current_app, send_file, abort
from pathlib import Path
from app.logging_setup import get_app_logger
from app.utils.settings import get_settings
from app.utils.external_fetcher import ExternalScriptFetcher
from werkzeug.exceptions import HTTPException
api_bp = Blueprint("api", __name__, url_prefix="/api")
app_logger = get_app_logger()
def _resolve_results_path(job_id: str) -> str:
"""
Compute the absolute results directory for a given job UUID.
Prefers <BASE>/artifacts/<uuid>, falls back to <BASE>/<uuid>.
"""
base_dir = "/data"
candidate_with_artifacts = os.path.join(base_dir, job_id)
if os.path.isdir(candidate_with_artifacts):
return candidate_with_artifacts
fallback = os.path.join(base_dir, job_id)
os.makedirs(fallback, exist_ok=True)
return fallback
def _make_snippet(text: str, max_chars: int = 1200) -> str:
"""Produce a trimmed, safe-to-render snippet of the script contents."""
if not text:
return ""
snippet = text.strip()
return (snippet[:max_chars] + "") if len(snippet) > max_chars else snippet
@api_bp.errorhandler(400)
@api_bp.errorhandler(403)
@api_bp.errorhandler(404)
@api_bp.errorhandler(405)
def _api_err(err):
"""
Return JSON for common client errors.
"""
if isinstance(err, HTTPException):
code = err.code
name = (err.name or "error").lower()
else:
code = 400
name = "error"
return jsonify({"ok": False, "error": name}), code
@api_bp.errorhandler(500)
def _api_500(err):
"""
Return JSON for server errors and log the exception.
"""
try:
app_logger.exception("API 500")
except Exception:
pass
return jsonify({"ok": False, "error": "internal server error"}), 500
@api_bp.post("/analyze_script")
def analyze_script():
"""
Analyze EXACTLY one external script URL for a given job UUID.
Expected JSON body:
{ "job_id": "<uuid>", "url": "https://cdn.example.com/app.js", "category": "script" }
"""
body = request.get_json(silent=True) or {}
job_id_raw = body.get("job_id") or body.get("uuid")
script_url_raw = body.get("url")
category = (body.get("category") or "script").strip() or None # default to "script"
job_id = (job_id_raw or "").strip() if isinstance(job_id_raw, str) else ""
script_url = (script_url_raw or "").strip() if isinstance(script_url_raw, str) else ""
# log this request
app_logger.info(f"Got request to analyze {script_url} via API ")
if not job_id or not script_url:
return jsonify({"ok": False, "error": "Missing job_id (or uuid) or url"}), 400
settings = get_settings()
if not settings.external_fetch.enabled:
return jsonify({"ok": False, "error": "Feature disabled"}), 400
# Resolve the UUID-backed results directory for this run.
results_path = _resolve_results_path(job_id)
# Initialize the fetcher; it reads its own settings internally.
fetcher = ExternalScriptFetcher(results_path=results_path)
# Unique index for the saved file name: <results_path>/scripts/fetched/<index>.js
unique_index = int(time.time() * 1000)
outcome = fetcher.fetch_one(script_url=script_url, index=unique_index)
if not outcome.ok or not outcome.saved_path:
return jsonify({
"ok": False,
"error": outcome.reason,
"status_code": outcome.status_code,
"final_url": outcome.final_url
}), 502
# Read bytes and decode to UTF-8 for rules and snippet
try:
with open(outcome.saved_path, "rb") as fh:
js_text = fh.read().decode("utf-8", errors="ignore")
except Exception:
js_text = ""
# Pull the rules engine from the app (prefer attribute, then config).
findings = []
try:
engine = getattr(current_app, "rule_engine", None)
if engine is None:
engine = current_app.config.get("RULE_ENGINE")
except Exception:
engine = None
if engine is not None and hasattr(engine, "run_all"):
try:
# run_all returns PASS/FAIL for each rule; we only surface FAIL (matched) to the UI
all_results = engine.run_all(js_text, category=category)
if isinstance(all_results, list):
matched = []
for r in all_results:
try:
if (r.get("result") == "FAIL"):
matched.append({
"name": r.get("name"),
"description": r.get("description"),
"severity": r.get("severity"),
"tags": r.get("tags") or [],
"reason": r.get("reason"),
"category": r.get("category"),
})
except Exception:
# Ignore malformed entries
continue
findings = matched
except Exception as exc:
try:
app_logger.error("Rule engine error", extra={"error": str(exc)})
except Exception:
pass
findings = []
snippet = _make_snippet(js_text, max_chars=settings.ui.snippet_preview_len)
return jsonify({
"ok": True,
"final_url": outcome.final_url,
"status_code": outcome.status_code,
"bytes": outcome.bytes_fetched,
"truncated": outcome.truncated,
"sha256": outcome.sha256_hex,
"artifact_path": outcome.saved_path,
"findings": findings, # only FAILed rules
"snippet": snippet,
"snippet_len": len(js_text)
})
@api_bp.get("/artifacts/<run_uuid>/<filename>")
def get_artifact_raw(run_uuid, filename):
# prevent path traversal
if "/" in filename or ".." in filename:
abort(400)
run_dir = _resolve_results_path(run_uuid)
full_path = Path(run_dir) / filename
# if file is not there, give a 404
if not os.path.isfile(full_path):
abort(404)
# else return file
return send_file(full_path, as_attachment=False)