Files
SneakyScope/app/utils/tls_probe.py
Phillip Tarrant 693f7d67b9 feat: HTTPS auto-normalization; robust TLS intel UI; global rules state; clean logging; preload
- Add SSL/TLS intelligence pipeline:
  - crt.sh lookup with expired-filtering and root-domain wildcard resolution
  - live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data

- Introduce `URLNormalizer` with punycode support and typo repair
  - Auto-prepend `https://` for bare domains (e.g., `google.com`)
  - Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
  - `get_url_normalizer()` reads defaults from Settings (if present)

- Standardize function-rule return shape to `(bool, dict|None)` across
  `form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
  - Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
  - Adapt non-dict inputs to facts (category-aware and via provided adapter)
  - Return `(True, dict)` on match, `(False, None)` on miss
  - Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
  - Back rules by private `self._rules`; `rules` property returns copy
  - Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
  - Fix AttributeError from property assignment during `__init__`

- Replace hidden singleton factory with explicit builder + global state:
  - `app/rules/factory.py::build_rules_engine()` builds and logs totals
  - `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
  - `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts

- Unify logging wiring:
  - `wire_logging_once(app)` clears and attaches a single handler chain
  - Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
  - Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)

- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)

- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section

- Consumers should import the rules engine via:
  - `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
  then publish with `set_rules_engine()`. Do not call old singleton factories.

- New/changed modules (high level):
  - `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
  - `app/rules/function_rules.py` (±) — normalized payload returns
  - `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
  - `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
  - `app/rules/factory.py` (±) — pure builder; totals logged post-registration
  - `app/state.py` (+) — process-global rules engine
  - `app/logging_setup.py` (±) — single chain, two named loggers
  - `app/wsgi.py` (±) — preload build + `set_rules_engine()`
  - `entrypoint.sh` (±) — add `--preload`
  - templates (±) — TLS card, raw toggle; front-page checkbox

Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
2025-08-21 22:05:16 -05:00

271 lines
10 KiB
Python

import socket
import ssl
import time
import logging
from urllib.parse import urlparse
class TLSProbeResult:
"""
Container for the results of a TLS probe across protocol versions.
"""
def __init__(self):
self.hostname = None
self.port = 443
self.results_by_version = {} # e.g., {"TLS1.2": {"supported": True, "cipher": "TLS_AES_128_GCM_SHA256", ...}}
self.weak_protocols = [] # e.g., ["TLS1.0", "TLS1.1"]
self.weak_ciphers = [] # e.g., ["RC4-SHA"]
self.errors = [] # textual errors encountered during probing
def to_dict(self):
"""
Convert the object to a serializable dictionary.
"""
output = {
"hostname": self.hostname,
"port": self.port,
"results_by_version": self.results_by_version,
"weak_protocols": self.weak_protocols,
"weak_ciphers": self.weak_ciphers,
"errors": self.errors
}
return output
class TLSEnumerator:
"""
Enumerate supported TLS versions for a server by attempting handshakes with constrained contexts.
Also collects the server-selected cipher for each successful handshake.
Notes:
- We do NOT validate certificates; this is posture discovery, not trust verification.
- Cipher enumeration is limited to "what was negotiated with default cipher list" per version.
Deep cipher scanning (per-cipher attempts) can be added later if needed.
"""
def __init__(self, timeout_seconds=5.0):
self.timeout_seconds = float(timeout_seconds)
def _build_context_for_version(self, tls_version_label):
"""
Build an SSLContext that only allows the specified TLS version.
"""
# Base client context
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# Disable certificate checks so we can probe misconfigured/self-signed endpoints
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Constrain to a single protocol version using minimum/maximum
# Map label -> ssl.TLSVersion
if tls_version_label == "TLS1.0" and hasattr(ssl.TLSVersion, "TLSv1"):
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1
elif tls_version_label == "TLS1.1" and hasattr(ssl.TLSVersion, "TLSv1_1"):
context.minimum_version = ssl.TLSVersion.TLSv1_1
context.maximum_version = ssl.TLSVersion.TLSv1_1
elif tls_version_label == "TLS1.2" and hasattr(ssl.TLSVersion, "TLSv1_2"):
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_2
elif tls_version_label == "TLS1.3" and hasattr(ssl.TLSVersion, "TLSv1_3"):
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.maximum_version = ssl.TLSVersion.TLSv1_3
else:
# Version not supported by this Python/OpenSSL build
return None
# Keep default cipher list; we only want to see what is negotiated
# You can later set context.set_ciphers("...") for deeper scans.
return context
def _attempt_handshake(self, hostname, port, context):
"""
Attempt a TLS handshake to (hostname, port) using the given context.
Returns a tuple: (supported(bool), selected_cipher(str or None), elapsed_seconds(float or None), error(str or None))
"""
supported = False
selected_cipher = None
elapsed = None
error_text = None
# Create a TCP connection with a timeout
sock = None
ssock = None
start = None
try:
# Resolve and connect
# Note: create_connection will handle IPv4/IPv6 resolution
sock = socket.create_connection((hostname, port), timeout=self.timeout_seconds)
# Start timer right before TLS wrap to capture handshake duration mainly
start = time.time()
# SNI is important: pass server_hostname
ssock = context.wrap_socket(sock, server_hostname=hostname)
# Access negotiated cipher; returns (cipher_name, protocol, secret_bits)
cipher_info = ssock.cipher()
if cipher_info is not None and len(cipher_info) >= 1:
selected_cipher = str(cipher_info[0])
supported = True
elapsed = time.time() - start
except Exception as exc:
# Capture the error for diagnostics
error_text = f"{type(exc).__name__}: {str(exc)}"
elapsed = None
finally:
# Clean up sockets
try:
if ssock is not None:
ssock.close()
except Exception:
pass
try:
if sock is not None:
sock.close()
except Exception:
pass
return supported, selected_cipher, elapsed, error_text
def probe(self, target):
"""
Probe the target (URL or hostname or 'hostname:port') for TLS 1.0/1.1/1.2/1.3 support.
Returns TLSProbeResult.
"""
result = TLSProbeResult()
host, port = self._parse_target_to_host_port(target)
result.hostname = host
result.port = port
if host is None:
result.errors.append("Unable to parse a hostname from the target.")
return result
# Define the versions we will test, in ascending order
versions_to_test = ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"]
# Iterate explicitly to match your coding style preference
for version_label in versions_to_test:
context = self._build_context_for_version(version_label)
# If this Python/OpenSSL cannot restrict to this version, mark as unsupported_by_runtime
if context is None:
version_outcome = {
"supported": False,
"selected_cipher": None,
"handshake_seconds": None,
"error": "Version not supported by local runtime"
}
result.results_by_version[version_label] = version_outcome
continue
supported, cipher, elapsed, err = self._attempt_handshake(host, port, context)
version_outcome = {
"supported": supported,
"selected_cipher": cipher,
"handshake_seconds": elapsed,
"error": err
}
result.results_by_version[version_label] = version_outcome
# Determine weak protocols (if the handshake succeeded on legacy versions)
# RFC 8996 and industry guidance deprecate TLS 1.0 and 1.1.
try:
v10 = result.results_by_version.get("TLS1.0")
if v10 is not None and v10.get("supported") is True:
result.weak_protocols.append("TLS1.0")
except Exception:
pass
try:
v11 = result.results_by_version.get("TLS1.1")
if v11 is not None and v11.get("supported") is True:
result.weak_protocols.append("TLS1.1")
except Exception:
pass
# Flag weak ciphers encountered in any successful negotiation
# This is a heuristic: we only see the single chosen cipher per version.
try:
for label in ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"]:
outcome = result.results_by_version.get(label)
if outcome is None:
continue
if outcome.get("supported") is not True:
continue
cipher_name = outcome.get("selected_cipher")
if cipher_name is None:
continue
# Simple string-based checks for known-weak families
# (RC4, 3DES, NULL, EXPORT, MD5). Expand as needed.
name_upper = str(cipher_name).upper()
is_weak = False
if "RC4" in name_upper:
is_weak = True
elif "3DES" in name_upper or "DES-CBC3" in name_upper:
is_weak = True
elif "NULL" in name_upper:
is_weak = True
elif "EXPORT" in name_upper or "EXP-" in name_upper:
is_weak = True
elif "-MD5" in name_upper:
is_weak = True
if is_weak:
# Avoid duplicates
if cipher_name not in result.weak_ciphers:
result.weak_ciphers.append(cipher_name)
except Exception as exc:
result.errors.append(f"Cipher analysis error: {exc}")
return result
def _parse_target_to_host_port(self, target):
"""
Accepts URL, hostname, or 'hostname:port' and returns (hostname, port).
Defaults to port 443 if not specified.
"""
if target is None:
return None, 443
text = str(target).strip()
if text == "":
return None, 443
# If it's clearly a URL, parse it normally
if "://" in text:
parsed = urlparse(text)
hostname = parsed.hostname
port = parsed.port
if hostname is None:
return None, 443
if port is None:
port = 443
return hostname.lower(), int(port)
# If it's host:port, split safely
# Note: URLs without scheme can be tricky (IPv6), but we'll handle [::1]:443 form later if needed
if ":" in text and text.count(":") == 1:
host_part, port_part = text.split(":")
host_part = host_part.strip()
port_part = port_part.strip()
if host_part == "":
return None, 443
try:
port_value = int(port_part)
except Exception:
port_value = 443
return host_part.lower(), int(port_value)
# Otherwise treat it as a bare hostname
return text.lower(), 443