- Add SSL/TLS intelligence pipeline:
- crt.sh lookup with expired-filtering and root-domain wildcard resolution
- live TLS version/cipher probe with weak/legacy flags and probe notes
- UI: card + matrix rendering, raw JSON toggle, and host/wildcard cert lists
- Front page: checkbox to optionally fetch certificate/CT data
- Introduce `URLNormalizer` with punycode support and typo repair
- Auto-prepend `https://` for bare domains (e.g., `google.com`)
- Optional quick HTTPS reachability + `http://` fallback
- Provide singleton via function-cached `@singleton_loader`:
- `get_url_normalizer()` reads defaults from Settings (if present)
- Standardize function-rule return shape to `(bool, dict|None)` across
`form_*` and `script_*` rules; include structured payloads (`note`, hosts, ext, etc.)
- Harden `FunctionRuleAdapter`:
- Coerce legacy returns `(bool)`, `(bool, str)` → normalized outputs
- Adapt non-dict inputs to facts (category-aware and via provided adapter)
- Return `(True, dict)` on match, `(False, None)` on miss
- Bind-time logging with file:line + function id for diagnostics
- `RuleEngine`:
- Back rules by private `self._rules`; `rules` property returns copy
- Idempotent `add_rule(replace=False)` with in-place replace and regex (re)compile
- Fix AttributeError from property assignment during `__init__`
- Replace hidden singleton factory with explicit builder + global state:
- `app/rules/factory.py::build_rules_engine()` builds and logs totals
- `app/state.py` exposes `set_rules_engine()` / `get_rules_engine()` as the SOF
- `app/wsgi.py` builds once at preload and publishes via `set_rules_engine()`
- Add lightweight debug hooks (`SS_DEBUG_RULES=1`) to trace engine id and rule counts
- Unify logging wiring:
- `wire_logging_once(app)` clears and attaches a single handler chain
- Create two named loggers: `sneakyscope.app` and `sneakyscope.engine`
- Disable propagation to prevent dupes; include pid/logger name in format
- Remove stray/duplicate handlers and import-time logging
- Optional dedup filter for bursty repeats (kept off by default)
- Gunicorn: enable `--preload` in entrypoint to avoid thread races and double registration
- Documented foreground vs background log “double consumer” caveat (attach vs `compose logs`)
- Jinja: replace `{% return %}` with structured `if/elif/else` branches
- Add toggle button to show raw JSON for TLS/CT section
- Consumers should import the rules engine via:
- `from app.state import get_rules_engine`
- Use `build_rules_engine()` **only** during preload/init to construct the instance,
then publish with `set_rules_engine()`. Do not call old singleton factories.
- New/changed modules (high level):
- `app/utils/urltools.py` (+) — URLNormalizer + `get_url_normalizer()`
- `app/rules/function_rules.py` (±) — normalized payload returns
- `engine/function_rule_adapter.py` (±) — coercion, fact adaptation, bind logs
- `app/utils/rules_engine.py` (±) — `_rules`, idempotent `add_rule`, fixes
- `app/rules/factory.py` (±) — pure builder; totals logged post-registration
- `app/state.py` (+) — process-global rules engine
- `app/logging_setup.py` (±) — single chain, two named loggers
- `app/wsgi.py` (±) — preload build + `set_rules_engine()`
- `entrypoint.sh` (±) — add `--preload`
- templates (±) — TLS card, raw toggle; front-page checkbox
Closes: flaky rule-type warnings, duplicate logs, and multi-worker race on rules init.
271 lines
10 KiB
Python
271 lines
10 KiB
Python
import socket
|
|
import ssl
|
|
import time
|
|
import logging
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
class TLSProbeResult:
|
|
"""
|
|
Container for the results of a TLS probe across protocol versions.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.hostname = None
|
|
self.port = 443
|
|
self.results_by_version = {} # e.g., {"TLS1.2": {"supported": True, "cipher": "TLS_AES_128_GCM_SHA256", ...}}
|
|
self.weak_protocols = [] # e.g., ["TLS1.0", "TLS1.1"]
|
|
self.weak_ciphers = [] # e.g., ["RC4-SHA"]
|
|
self.errors = [] # textual errors encountered during probing
|
|
|
|
def to_dict(self):
|
|
"""
|
|
Convert the object to a serializable dictionary.
|
|
"""
|
|
output = {
|
|
"hostname": self.hostname,
|
|
"port": self.port,
|
|
"results_by_version": self.results_by_version,
|
|
"weak_protocols": self.weak_protocols,
|
|
"weak_ciphers": self.weak_ciphers,
|
|
"errors": self.errors
|
|
}
|
|
return output
|
|
|
|
|
|
class TLSEnumerator:
|
|
"""
|
|
Enumerate supported TLS versions for a server by attempting handshakes with constrained contexts.
|
|
Also collects the server-selected cipher for each successful handshake.
|
|
|
|
Notes:
|
|
- We do NOT validate certificates; this is posture discovery, not trust verification.
|
|
- Cipher enumeration is limited to "what was negotiated with default cipher list" per version.
|
|
Deep cipher scanning (per-cipher attempts) can be added later if needed.
|
|
"""
|
|
|
|
def __init__(self, timeout_seconds=5.0):
|
|
self.timeout_seconds = float(timeout_seconds)
|
|
|
|
def _build_context_for_version(self, tls_version_label):
|
|
"""
|
|
Build an SSLContext that only allows the specified TLS version.
|
|
"""
|
|
# Base client context
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
|
|
# Disable certificate checks so we can probe misconfigured/self-signed endpoints
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
|
|
# Constrain to a single protocol version using minimum/maximum
|
|
# Map label -> ssl.TLSVersion
|
|
if tls_version_label == "TLS1.0" and hasattr(ssl.TLSVersion, "TLSv1"):
|
|
context.minimum_version = ssl.TLSVersion.TLSv1
|
|
context.maximum_version = ssl.TLSVersion.TLSv1
|
|
elif tls_version_label == "TLS1.1" and hasattr(ssl.TLSVersion, "TLSv1_1"):
|
|
context.minimum_version = ssl.TLSVersion.TLSv1_1
|
|
context.maximum_version = ssl.TLSVersion.TLSv1_1
|
|
elif tls_version_label == "TLS1.2" and hasattr(ssl.TLSVersion, "TLSv1_2"):
|
|
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
|
context.maximum_version = ssl.TLSVersion.TLSv1_2
|
|
elif tls_version_label == "TLS1.3" and hasattr(ssl.TLSVersion, "TLSv1_3"):
|
|
context.minimum_version = ssl.TLSVersion.TLSv1_3
|
|
context.maximum_version = ssl.TLSVersion.TLSv1_3
|
|
else:
|
|
# Version not supported by this Python/OpenSSL build
|
|
return None
|
|
|
|
# Keep default cipher list; we only want to see what is negotiated
|
|
# You can later set context.set_ciphers("...") for deeper scans.
|
|
return context
|
|
|
|
def _attempt_handshake(self, hostname, port, context):
|
|
"""
|
|
Attempt a TLS handshake to (hostname, port) using the given context.
|
|
Returns a tuple: (supported(bool), selected_cipher(str or None), elapsed_seconds(float or None), error(str or None))
|
|
"""
|
|
supported = False
|
|
selected_cipher = None
|
|
elapsed = None
|
|
error_text = None
|
|
|
|
# Create a TCP connection with a timeout
|
|
sock = None
|
|
ssock = None
|
|
start = None
|
|
try:
|
|
# Resolve and connect
|
|
# Note: create_connection will handle IPv4/IPv6 resolution
|
|
sock = socket.create_connection((hostname, port), timeout=self.timeout_seconds)
|
|
|
|
# Start timer right before TLS wrap to capture handshake duration mainly
|
|
start = time.time()
|
|
|
|
# SNI is important: pass server_hostname
|
|
ssock = context.wrap_socket(sock, server_hostname=hostname)
|
|
|
|
# Access negotiated cipher; returns (cipher_name, protocol, secret_bits)
|
|
cipher_info = ssock.cipher()
|
|
if cipher_info is not None and len(cipher_info) >= 1:
|
|
selected_cipher = str(cipher_info[0])
|
|
|
|
supported = True
|
|
elapsed = time.time() - start
|
|
|
|
except Exception as exc:
|
|
# Capture the error for diagnostics
|
|
error_text = f"{type(exc).__name__}: {str(exc)}"
|
|
elapsed = None
|
|
finally:
|
|
# Clean up sockets
|
|
try:
|
|
if ssock is not None:
|
|
ssock.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if sock is not None:
|
|
sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
return supported, selected_cipher, elapsed, error_text
|
|
|
|
def probe(self, target):
|
|
"""
|
|
Probe the target (URL or hostname or 'hostname:port') for TLS 1.0/1.1/1.2/1.3 support.
|
|
Returns TLSProbeResult.
|
|
"""
|
|
result = TLSProbeResult()
|
|
host, port = self._parse_target_to_host_port(target)
|
|
result.hostname = host
|
|
result.port = port
|
|
|
|
if host is None:
|
|
result.errors.append("Unable to parse a hostname from the target.")
|
|
return result
|
|
|
|
# Define the versions we will test, in ascending order
|
|
versions_to_test = ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"]
|
|
|
|
# Iterate explicitly to match your coding style preference
|
|
for version_label in versions_to_test:
|
|
context = self._build_context_for_version(version_label)
|
|
|
|
# If this Python/OpenSSL cannot restrict to this version, mark as unsupported_by_runtime
|
|
if context is None:
|
|
version_outcome = {
|
|
"supported": False,
|
|
"selected_cipher": None,
|
|
"handshake_seconds": None,
|
|
"error": "Version not supported by local runtime"
|
|
}
|
|
result.results_by_version[version_label] = version_outcome
|
|
continue
|
|
|
|
supported, cipher, elapsed, err = self._attempt_handshake(host, port, context)
|
|
|
|
version_outcome = {
|
|
"supported": supported,
|
|
"selected_cipher": cipher,
|
|
"handshake_seconds": elapsed,
|
|
"error": err
|
|
}
|
|
result.results_by_version[version_label] = version_outcome
|
|
|
|
# Determine weak protocols (if the handshake succeeded on legacy versions)
|
|
# RFC 8996 and industry guidance deprecate TLS 1.0 and 1.1.
|
|
try:
|
|
v10 = result.results_by_version.get("TLS1.0")
|
|
if v10 is not None and v10.get("supported") is True:
|
|
result.weak_protocols.append("TLS1.0")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
v11 = result.results_by_version.get("TLS1.1")
|
|
if v11 is not None and v11.get("supported") is True:
|
|
result.weak_protocols.append("TLS1.1")
|
|
except Exception:
|
|
pass
|
|
|
|
# Flag weak ciphers encountered in any successful negotiation
|
|
# This is a heuristic: we only see the single chosen cipher per version.
|
|
try:
|
|
for label in ["TLS1.0", "TLS1.1", "TLS1.2", "TLS1.3"]:
|
|
outcome = result.results_by_version.get(label)
|
|
if outcome is None:
|
|
continue
|
|
if outcome.get("supported") is not True:
|
|
continue
|
|
|
|
cipher_name = outcome.get("selected_cipher")
|
|
if cipher_name is None:
|
|
continue
|
|
|
|
# Simple string-based checks for known-weak families
|
|
# (RC4, 3DES, NULL, EXPORT, MD5). Expand as needed.
|
|
name_upper = str(cipher_name).upper()
|
|
is_weak = False
|
|
|
|
if "RC4" in name_upper:
|
|
is_weak = True
|
|
elif "3DES" in name_upper or "DES-CBC3" in name_upper:
|
|
is_weak = True
|
|
elif "NULL" in name_upper:
|
|
is_weak = True
|
|
elif "EXPORT" in name_upper or "EXP-" in name_upper:
|
|
is_weak = True
|
|
elif "-MD5" in name_upper:
|
|
is_weak = True
|
|
|
|
if is_weak:
|
|
# Avoid duplicates
|
|
if cipher_name not in result.weak_ciphers:
|
|
result.weak_ciphers.append(cipher_name)
|
|
except Exception as exc:
|
|
result.errors.append(f"Cipher analysis error: {exc}")
|
|
|
|
return result
|
|
|
|
def _parse_target_to_host_port(self, target):
|
|
"""
|
|
Accepts URL, hostname, or 'hostname:port' and returns (hostname, port).
|
|
Defaults to port 443 if not specified.
|
|
"""
|
|
if target is None:
|
|
return None, 443
|
|
|
|
text = str(target).strip()
|
|
if text == "":
|
|
return None, 443
|
|
|
|
# If it's clearly a URL, parse it normally
|
|
if "://" in text:
|
|
parsed = urlparse(text)
|
|
hostname = parsed.hostname
|
|
port = parsed.port
|
|
if hostname is None:
|
|
return None, 443
|
|
if port is None:
|
|
port = 443
|
|
return hostname.lower(), int(port)
|
|
|
|
# If it's host:port, split safely
|
|
# Note: URLs without scheme can be tricky (IPv6), but we'll handle [::1]:443 form later if needed
|
|
if ":" in text and text.count(":") == 1:
|
|
host_part, port_part = text.split(":")
|
|
host_part = host_part.strip()
|
|
port_part = port_part.strip()
|
|
if host_part == "":
|
|
return None, 443
|
|
try:
|
|
port_value = int(port_part)
|
|
except Exception:
|
|
port_value = 443
|
|
return host_part.lower(), int(port_value)
|
|
|
|
# Otherwise treat it as a bare hostname
|
|
return text.lower(), 443
|