Files
SneakyScope/app/utils/enrichment.py

380 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from urllib.parse import urlparse
import requests
import json
import whois
from datetime import datetime
from ipaddress import ip_address
import socket
# Optional: high-accuracy root-domain detection if available (tldextract is in the requirements, but this is still useful)
try:
import tldextract
_HAS_TLDEXTRACT = True
except Exception:
_HAS_TLDEXTRACT = False
# Local imports
from app.utils.cache_db import get_cache
from app.utils.settings import get_settings
from app.utils.tls_probe import TLSEnumerator
# Configure logger
from app.logging_setup import get_app_logger
# Init cache
cache = get_cache("/data/cache.db")
settings = get_settings()
# 24 hours * 60 minutes
days = 24 * 60
GEOIP_DEFAULT_TTL = settings.cache.geoip_cache_days * days
WHOIS_DEFAULT_TTL = settings.cache.whois_cache_days * days
CRT_DEFAULT_TTL = settings.cache.crt_cache_days * days
logger = get_app_logger()
def parse_target_to_host(target):
"""
Convert a user-supplied string (URL or domain) into a hostname.
Returns:
str or None
"""
if target is None:
return None
value = str(target).strip()
if value == "":
return None
# urlparse needs a scheme to treat the first token as netloc
parsed = urlparse(value if "://" in value else f"http://{value}")
# If the input was something like "localhost:8080/path", netloc includes the port
host = parsed.hostname
if host is None:
return None
# Lowercase for consistency
host = host.strip().lower()
if host == "":
return None
return host
def get_root_domain(hostname):
"""
Determine the registrable/root domain from a hostname.
Prefers tldextract if available; otherwise falls back to a heuristic.
Examples:
sub.a.example.com -> example.com
portal.gov.uk -> gov.uk (but with PSL, youd get portal.gov.uks registrable, which is gov.uk)
api.example.co.uk -> example.co.uk (PSL needed for correctness)
Returns:
str (best-effort registrable domain)
"""
if hostname is None:
return None
if _HAS_TLDEXTRACT:
# tldextract returns subdomain, domain, suffix separately using PSL rules
# e.g., sub= "api", domain="example", suffix="co.uk"
parts = tldextract.extract(hostname)
# If suffix is empty (e.g., localhost), fall back
if parts.suffix:
return f"{parts.domain}.{parts.suffix}".lower()
else:
return hostname.lower()
# Fallback heuristic: last two labels (not perfect for multi-part TLDs, but safe)
# We avoid list comprehensions per your preference for explicit code
labels = hostname.split(".")
labels = [lbl for lbl in labels if lbl] # allow simple cleanup without logic change
if len(labels) >= 2:
last = labels[-1]
second_last = labels[-2]
candidate = f"{second_last}.{last}".lower()
return candidate
return hostname.lower()
def is_root_domain(hostname):
"""
Is the provided hostname the same as its registrable/root domain?
"""
if hostname is None:
return False
root = get_root_domain(hostname)
if root is None:
return False
return hostname.lower() == root.lower()
def search_certs(domain, wildcard=True, expired=True, deduplicate=True):
"""
Search crt.sh for the given domain.
domain -- Domain to search for
wildcard -- Whether or not to prepend a wildcard to the domain
(default: True)
expired -- Whether or not to include expired certificates
(default: True)
Return a list of objects, like so:
{
"issuer_ca_id": 16418,
"issuer_name": "C=US, O=Let's Encrypt, CN=Let's Encrypt Authority X3",
"name_value": "hatch.uber.com",
"min_cert_id": 325717795,
"min_entry_timestamp": "2018-02-08T16:47:39.089",
"not_before": "2018-02-08T15:47:39"
}
"""
cache_key = f"crt_cert:{domain}"
# log if caching is turned on or not
logger.info(f"CRT Cache is set to: {settings.cache.crt_cache_enabled}")
if settings.cache.crt_cache_enabled:
cached = cache.read(cache_key)
if cached:
logger.info(f"[CACHE HIT] for CRT Cert: {domain}")
return cached
else:
logger.info(f"[CACHE MISS] for CRT Cert: {domain} - {cache_key}")
base_url = "https://crt.sh/?q={}&output=json"
if not expired:
base_url = base_url + "&exclude=expired"
if deduplicate:
base_url = base_url + "&deduplicate=Y"
if wildcard and "%" not in domain:
domain = "%.{}".format(domain)
url = base_url.format(domain)
ua = 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1'
req = requests.get(url, headers={'User-Agent': ua})
if req.ok:
try:
content = req.content.decode('utf-8')
data = json.loads(content)
# if caching
if settings.cache.crt_cache_enabled:
logger.info(f"Setting Cache for {cache_key}")
cache.create(cache_key, data, CRT_DEFAULT_TTL)
return data
except ValueError:
# crt.sh fixed their JSON response. This shouldn't be necessary anymore
# https://github.com/crtsh/certwatch_db/commit/f4f46ea37c23543c4cdf1a3c8867d68967641807
data = json.loads("[{}]".format(content.replace('}{', '},{')))
# if caching
if settings.cache.crt_cache_enabled:
logger.info(f"Setting Cache for {cache_key}")
cache.create(cache_key, data, CRT_DEFAULT_TTL)
return data
except Exception as err:
logger.error("Error retrieving cert information from CRT.sh.")
return None
def gather_crtsh_certs_for_target(target):
"""
Given a URL or domain-like input, return crt.sh results for:
- The exact hostname
- If hostname is a subdomain, also the wildcard for the root domain (e.g., *.example.com)
We intentionally run this even if the scheme is HTTP (per your design).
Expired certs are excluded by default.
Returns:
dict:
{
"input": <original target>,
"hostname": <parsed hostname>,
"root_domain": <registrable>,
"is_root_domain": <bool>,
"crtsh": {
"host_certs": [... or None],
"wildcard_root_certs": [... or None]
}
}
"""
result = {
"input": target,
"hostname": None,
"root_domain": None,
"is_root_domain": False,
"crtsh": {
"host_certs": None,
"wildcard_root_certs": None
}
}
try:
hostname = parse_target_to_host(target)
result["hostname"] = hostname
# return fake return if no hostname was able to be parsed
if hostname is None:
return result
root = get_root_domain(hostname)
result["root_domain"] = root
result["is_root_domain"] = is_root_domain(hostname)
# Always query crt.sh for the specific hostname
# (expired=False means we filter expired)
host_certs = search_certs(hostname, wildcard=False, expired=False)
result["crtsh"]["host_certs"] = host_certs
# If subdomain, also look up wildcard for the root domain: *.root
if not result["is_root_domain"] and root:
wildcard_certs = search_certs(root, wildcard=True, expired=False)
result["crtsh"]["wildcard_root_certs"] = wildcard_certs
except Exception as exc:
logger.exception("crt.sh enrichment failed: %s", exc)
return result
def enrich_url(url: str, fetch_ssl_enabled:bool=False) -> dict:
"""Perform WHOIS, GeoIP"""
enrichment = {}
# Extract hostname
parsed = urlparse(url)
hostname = parsed.hostname or url # fallback if parsing fails
# --- WHOIS ---
enrichment.update(enrich_whois(hostname))
# --- GeoIP ---
enrichment["geoip"] = enrich_geoip(hostname)
# === SSL/TLS: crt.sh + live probe ===
# if fetching ssl...
if fetch_ssl_enabled:
try:
# 1) Certificate Transparency (already implemented previously)
crtsh_info = gather_crtsh_certs_for_target(url)
# 2) Live TLS probe (versions + negotiated cipher per version)
tls_enum = TLSEnumerator(timeout_seconds=5.0)
probe_result = tls_enum.probe(url)
enrichment["ssl_tls"] = {}
enrichment["ssl_tls"]["crtsh"] = crtsh_info
enrichment["ssl_tls"]["probe"] = probe_result.to_dict()
except Exception as exc:
logger.exception("SSL/TLS enrichment failed: %s", exc)
enrichment["ssl_tls"] = {"error": "SSL/TLS enrichment failed"}
else:
# Include a small marker so the UI can show “skipped”
enrichment["ssl_tls"] = {"skipped": True, "reason": "Disabled on submission"}
return enrichment
def enrich_whois(hostname: str) -> dict:
"""Fetch WHOIS info using python-whois with safe type handling."""
cache_key = f"whois:{hostname}"
cached = cache.read(cache_key)
if cached:
logger.info(f"[CACHE HIT] for WHOIS: {hostname}")
return cached
logger.info(f"[CACHE MISS] for WHOIS: {hostname}")
result = {}
try:
w = whois.whois(hostname)
def format_dt(val):
if isinstance(val, list):
return ", ".join([v.strftime("%Y-%m-%d %H:%M:%S") if isinstance(v, datetime) else str(v) for v in val])
elif isinstance(val, datetime):
return val.strftime("%Y-%m-%d %H:%M:%S")
elif val is None:
return "Possible Privacy"
else:
return str(val)
result["whois"] = {
"registrar": format_dt(getattr(w, "registrar", None)),
"creation_date": format_dt(getattr(w, "creation_date", None)),
"expiration_date": format_dt(getattr(w, "expiration_date", None)),
"owner": format_dt(getattr(w, "org", None))
}
except Exception as e:
logger.warning(f"WHOIS lookup failed for {hostname}: {e}")
try:
# fallback raw whois text
import subprocess
raw_output = subprocess.check_output(["whois", hostname], encoding="utf-8", errors="ignore")
result["whois"] = {}
result["raw_whois"] = raw_output
except Exception as raw_e:
logger.error(f"Raw WHOIS also failed: {raw_e}")
result["whois"] = {}
result["raw_whois"] = "N/A"
cache.create(cache_key, result, WHOIS_DEFAULT_TTL)
return result
def enrich_geoip(hostname: str) -> dict:
"""Resolve hostname to IPs and fetch info from ip-api.com."""
CLOUDFLARE_ASN = "AS13335 Cloudflare"
geo_info = {}
ips = extract_ips_from_url(hostname)
for ip in ips:
ip_str = str(ip)
cache_key = f"geoip:{ip_str}"
cached = cache.read(cache_key)
if cached:
logger.info(f"[CACHE HIT] for GEOIP: {ip}")
geo_info[ip_str] = cached
continue
logger.info(f"[CACHE MISS] for GEOIP: {ip}")
try:
resp = requests.get(f"http://ip-api.com/json/{ip_str}?fields=24313855", timeout=5)
if resp.status_code == 200:
geo_info[ip_str] = resp.json()
asname = geo_info[ip_str].get("as")
# if behind cloudflare
if CLOUDFLARE_ASN in asname:
geo_info[ip_str].update({"cloudflare":True})
else:
geo_info[ip_str] = {"error": f"HTTP {resp.status_code}"}
except Exception as e:
geo_info[ip_str] = {"error": str(e)}
cache.create(cache_key, geo_info[ip_str],GEOIP_DEFAULT_TTL)
return geo_info
def extract_ips_from_url(hostname: str):
"""Resolve hostname to IPs."""
try:
info = socket.getaddrinfo(hostname, None)
return list({ip_address(x[4][0]) for x in info})
except Exception:
return []