- Add SSL/TLS intelligence pipeline:
- crt.sh lookup with expired-filtering and root-domain wildcard resolution
- live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data
- Introduce `URLNormalizer` with punycode support and typo repair
- Auto-prepend `https://` for bare domains (e.g., `google.com`)
- Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
- `get_url_normalizer()` reads defaults from Settings (if present)
- Standardize function-rule return shape to `(bool, dict|None)` across
`form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
- Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
- Adapt non-dict inputs to facts (category-aware and via provided adapter)
- Return `(True, dict)` on match, `(False, None)` on miss
- Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
- Back rules by private `self._rules`; `rules` property returns copy
- Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
- Fix AttributeError from property assignment during `__init__`
- Replace hidden singleton factory with explicit builder + global state:
- `app/rules/factory.py::build_rules_engine()` builds and logs totals
- `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
- `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts
- Unify logging wiring:
- `wire_logging_once(app)` clears and attaches a single handler chain
- Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
- Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)
- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)
- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section
- Consumers should import the rules engine via:
- `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
then publish with `set_rules_engine()`. Do not call old singleton factories.
- New/changed modules (high level):
- `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
- `app/rules/function_rules.py` (±) — normalized payload returns
- `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
- `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
- `app/rules/factory.py` (±) — pure builder; totals logged post-registration
- `app/state.py` (+) — process-global rules engine
- `app/logging_setup.py` (±) — single chain, two named loggers
- `app/wsgi.py` (±) — preload build + `set_rules_engine()`
- `entrypoint.sh` (±) — add `--preload`
- templates (±) — TLS card, raw toggle; front-page checkbox
Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
117 lines
4.0 KiB
Python
117 lines
4.0 KiB
Python
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from app.logging_setup import get_app_logger
|
|
|
|
logger = get_app_logger()
|
|
|
|
def safe_write(path: Path | str, content: str, mode="w", encoding="utf-8"):
|
|
"""Write content to a file safely with logging."""
|
|
path = Path(path)
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, mode, encoding=encoding) as f:
|
|
f.write(content)
|
|
logger.info(f"[+] Wrote file: {path}")
|
|
except Exception as e:
|
|
logger.error(f"[!] Failed writing {path}: {e}")
|
|
raise
|
|
|
|
def get_recent_results(storage_dir: Path, limit: int, logger) -> list[dict]:
|
|
"""
|
|
Scan the SANDBOX_STORAGE directory for run folders (UUIDs), read each
|
|
run's results.json, and return the most recent N entries by file mtime.
|
|
|
|
Args:
|
|
storage_dir (Path): Base path where UUID run directories live.
|
|
limit (int): Maximum number of recent items to return.
|
|
logger: Flask or stdlib logger to record non-fatal issues.
|
|
|
|
Returns:
|
|
list[dict]: Each item includes:
|
|
{
|
|
"uuid": str,
|
|
"submitted_url": str | None,
|
|
"final_url": str | None,
|
|
"timestamp": str (ISO 8601),
|
|
}
|
|
Returns an empty list if no runs are found or on error.
|
|
"""
|
|
items = []
|
|
|
|
try:
|
|
# Ensure the storage dir exists
|
|
storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Iterate directories directly under storage_dir
|
|
for entry in storage_dir.iterdir():
|
|
try:
|
|
if not entry.is_dir():
|
|
# Skip non-directories
|
|
continue
|
|
|
|
# Expect results.json inside each UUID directory
|
|
results_path = entry / "results.json"
|
|
if not results_path.exists():
|
|
# Skip folders without results.json
|
|
continue
|
|
|
|
# Read file metadata (mtime) for sorting and display
|
|
stat_info = results_path.stat()
|
|
mtime_epoch = stat_info.st_mtime
|
|
mtime_iso = datetime.fromtimestamp(mtime_epoch).isoformat(timespec="seconds")
|
|
|
|
# Parse a small subset of the JSON for display
|
|
submitted_url = None
|
|
final_url = None
|
|
run_uuid = entry.name
|
|
|
|
try:
|
|
with open(results_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
if isinstance(data, dict):
|
|
submitted_url = data.get("submitted_url")
|
|
final_url = data.get("final_url")
|
|
except Exception as read_err:
|
|
# If JSON is malformed or unreadable, log and continue
|
|
if logger:
|
|
logger.warning(f"[recent] Failed reading {results_path}: {read_err}")
|
|
|
|
item = {
|
|
"uuid": run_uuid,
|
|
"submitted_url": submitted_url,
|
|
"final_url": final_url,
|
|
"timestamp": mtime_iso
|
|
}
|
|
|
|
items.append((mtime_epoch, item))
|
|
except Exception as inner_err:
|
|
# Keep going; a single bad folder should not break the list
|
|
if logger:
|
|
logger.warning(f"[recent] Skipping {entry}: {inner_err}")
|
|
|
|
# Sort by mtime desc
|
|
try:
|
|
items.sort(key=lambda t: t[0], reverse=True)
|
|
except Exception as sort_err:
|
|
if logger:
|
|
logger.warning(f"[recent] Sort failed: {sort_err}")
|
|
|
|
# Trim to limit without list comprehensions
|
|
trimmed = []
|
|
count = 0
|
|
for tup in items:
|
|
if count >= limit:
|
|
break
|
|
trimmed.append(tup[1])
|
|
count = count + 1
|
|
|
|
return trimmed
|
|
|
|
except Exception as outer_err:
|
|
if logger:
|
|
logger.error(f"[recent] Unexpected error while scanning {storage_dir}: {outer_err}")
|
|
return []
|