Files
SneakyScan/src/scanner.py
Phillip Tarrant 212596fa0a Add automatic multi-format report generation and ZIP archiving
Implements automatic generation of JSON, HTML, and ZIP outputs after every scan,
with all files sharing the same timestamp for easy correlation.

Features:
- Automatic HTML report generation after every scan
- ZIP archive creation containing JSON, HTML, and all screenshots
- Unified timestamp across all outputs (JSON, HTML, ZIP, screenshots)
- Graceful error handling (scan continues if HTML/ZIP generation fails)
- Email-ready ZIP archives for easy sharing

Technical changes:
- Fixed timestamp mismatch between scan() and save_report()
- Added generate_outputs() method to SneakyScanner class
- scan() now returns (report, timestamp) tuple
- save_report() accepts timestamp parameter instead of generating new one
- main() updated to call generate_outputs() for all output formats
- Added zipfile import and HTMLReportGenerator import
- Dockerfile updated to copy templates/ directory

Output structure:
- scan_report_YYYYMMDD_HHMMSS.json (JSON report)
- scan_report_YYYYMMDD_HHMMSS.html (HTML report)
- scan_report_YYYYMMDD_HHMMSS.zip (archive with JSON, HTML, screenshots)
- scan_report_YYYYMMDD_HHMMSS_screenshots/ (screenshots directory)

Documentation updated:
- README.md: Updated Output Format, Features, Quick Start sections
- CLAUDE.md: Updated Core Components, Scan Workflow, Key Design Decisions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-14 02:10:31 +00:00

827 lines
29 KiB
Python

#!/usr/bin/env python3
"""
SneakyScanner - Masscan-based network scanner with YAML configuration
"""
import argparse
import json
import logging
import subprocess
import sys
import tempfile
import time
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
import xml.etree.ElementTree as ET
import yaml
from libnmap.process import NmapProcess
from libnmap.parser import NmapParser
from screenshot_capture import ScreenshotCapture
from report_generator import HTMLReportGenerator
# Force unbuffered output for Docker
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
class SneakyScanner:
"""Wrapper for masscan to perform network scans based on YAML config"""
def __init__(self, config_path: str, output_dir: str = "/app/output"):
self.config_path = Path(config_path)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.config = self._load_config()
self.screenshot_capture = None
def _load_config(self) -> Dict[str, Any]:
"""Load and validate YAML configuration"""
if not self.config_path.exists():
raise FileNotFoundError(f"Config file not found: {self.config_path}")
with open(self.config_path, 'r') as f:
config = yaml.safe_load(f)
if not config.get('title'):
raise ValueError("Config must include 'title' field")
if not config.get('sites'):
raise ValueError("Config must include 'sites' field")
return config
def _run_masscan(self, targets: List[str], ports: str, protocol: str) -> List[Dict]:
"""
Run masscan and return parsed results
Args:
targets: List of IP addresses to scan
ports: Port range string (e.g., "0-65535")
protocol: "tcp" or "udp"
"""
if not targets:
return []
# Create temporary file for targets
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write('\n'.join(targets))
target_file = f.name
# Create temporary output file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
output_file = f.name
try:
# Build command based on protocol
if protocol == 'tcp':
cmd = [
'masscan',
'-iL', target_file,
'-p', ports,
'--rate', '10000',
'-oJ', output_file,
'--wait', '0'
]
elif protocol == 'udp':
cmd = [
'masscan',
'-iL', target_file,
'--udp-ports', ports,
'--rate', '10000',
'-oJ', output_file,
'--wait', '0'
]
else:
raise ValueError(f"Invalid protocol: {protocol}")
print(f"Running: {' '.join(cmd)}", flush=True)
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"Masscan {protocol.upper()} scan completed", flush=True)
if result.returncode != 0:
print(f"Masscan stderr: {result.stderr}", file=sys.stderr)
# Parse masscan JSON output
results = []
with open(output_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
try:
results.append(json.loads(line.rstrip(',')))
except json.JSONDecodeError:
continue
return results
finally:
# Cleanup temp files
Path(target_file).unlink(missing_ok=True)
Path(output_file).unlink(missing_ok=True)
def _run_ping_scan(self, targets: List[str]) -> Dict[str, bool]:
"""
Run ping scan using masscan ICMP echo
Returns:
Dict mapping IP addresses to ping response status
"""
if not targets:
return {}
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write('\n'.join(targets))
target_file = f.name
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
output_file = f.name
try:
cmd = [
'masscan',
'-iL', target_file,
'--ping',
'--rate', '10000',
'-oJ', output_file,
'--wait', '0'
]
print(f"Running: {' '.join(cmd)}", flush=True)
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"Masscan PING scan completed", flush=True)
if result.returncode != 0:
print(f"Masscan stderr: {result.stderr}", file=sys.stderr, flush=True)
# Parse results
responding_ips = set()
with open(output_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
try:
data = json.loads(line.rstrip(','))
if 'ip' in data:
responding_ips.add(data['ip'])
except json.JSONDecodeError:
continue
# Create result dict for all targets
return {ip: (ip in responding_ips) for ip in targets}
finally:
Path(target_file).unlink(missing_ok=True)
Path(output_file).unlink(missing_ok=True)
def _run_nmap_service_detection(self, ip_ports: Dict[str, List[int]]) -> Dict[str, List[Dict]]:
"""
Run nmap service detection on discovered ports
Args:
ip_ports: Dict mapping IP addresses to list of TCP ports
Returns:
Dict mapping IP addresses to list of service info dicts
"""
if not ip_ports:
return {}
all_services = {}
for ip, ports in ip_ports.items():
if not ports:
all_services[ip] = []
continue
# Build port list string
port_list = ','.join(map(str, sorted(ports)))
print(f" Scanning {ip} ports {port_list}...", flush=True)
# Create temporary output file for XML
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xml') as f:
xml_output = f.name
try:
# Run nmap with service detection
cmd = [
'nmap',
'-sV', # Service version detection
'--version-intensity', '5', # Balanced speed/accuracy
'-p', port_list,
'-oX', xml_output, # XML output
'--host-timeout', '5m', # Timeout per host
ip
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
print(f" Nmap warning for {ip}: {result.stderr}", file=sys.stderr, flush=True)
# Parse XML output
services = self._parse_nmap_xml(xml_output)
all_services[ip] = services
except subprocess.TimeoutExpired:
print(f" Nmap timeout for {ip}, skipping service detection", file=sys.stderr, flush=True)
all_services[ip] = []
except Exception as e:
print(f" Nmap error for {ip}: {e}", file=sys.stderr, flush=True)
all_services[ip] = []
finally:
Path(xml_output).unlink(missing_ok=True)
return all_services
def _parse_nmap_xml(self, xml_file: str) -> List[Dict]:
"""
Parse nmap XML output to extract service information
Args:
xml_file: Path to nmap XML output file
Returns:
List of service info dictionaries
"""
services = []
try:
tree = ET.parse(xml_file)
root = tree.getroot()
# Find all ports
for port_elem in root.findall('.//port'):
port_id = port_elem.get('portid')
protocol = port_elem.get('protocol', 'tcp')
# Get state
state_elem = port_elem.find('state')
if state_elem is None or state_elem.get('state') != 'open':
continue
# Get service info
service_elem = port_elem.find('service')
if service_elem is not None:
service_info = {
'port': int(port_id),
'protocol': protocol,
'service': service_elem.get('name', 'unknown'),
'product': service_elem.get('product', ''),
'version': service_elem.get('version', ''),
'extrainfo': service_elem.get('extrainfo', ''),
'ostype': service_elem.get('ostype', '')
}
# Clean up empty fields
service_info = {k: v for k, v in service_info.items() if v}
services.append(service_info)
else:
# Port is open but no service info
services.append({
'port': int(port_id),
'protocol': protocol,
'service': 'unknown'
})
except Exception as e:
print(f" Error parsing nmap XML: {e}", file=sys.stderr, flush=True)
return services
def _is_likely_web_service(self, service: Dict) -> bool:
"""
Check if a service is likely HTTP/HTTPS based on nmap detection or common web ports
Args:
service: Service dictionary from nmap results
Returns:
True if service appears to be web-related
"""
# Check service name
web_services = ['http', 'https', 'ssl', 'http-proxy', 'https-alt',
'http-alt', 'ssl/http', 'ssl/https']
service_name = service.get('service', '').lower()
if service_name in web_services:
return True
# Check common non-standard web ports
web_ports = [80, 443, 8000, 8006, 8008, 8080, 8081, 8443, 8888, 9443]
port = service.get('port')
return port in web_ports
def _detect_http_https(self, ip: str, port: int, timeout: int = 5) -> str:
"""
Detect if a port is HTTP or HTTPS
Args:
ip: IP address
port: Port number
timeout: Connection timeout in seconds
Returns:
'http', 'https', or 'unknown'
"""
import socket
import ssl as ssl_module
# Try HTTPS first
try:
context = ssl_module.create_default_context()
context.check_hostname = False
context.verify_mode = ssl_module.CERT_NONE
with socket.create_connection((ip, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=ip) as ssock:
return 'https'
except ssl_module.SSLError:
# Not HTTPS, try HTTP
pass
except (socket.timeout, socket.error, ConnectionRefusedError):
return 'unknown'
# Try HTTP
try:
with socket.create_connection((ip, port), timeout=timeout) as sock:
sock.send(b'HEAD / HTTP/1.0\r\n\r\n')
response = sock.recv(1024)
if b'HTTP' in response:
return 'http'
except (socket.timeout, socket.error, ConnectionRefusedError):
pass
return 'unknown'
def _analyze_ssl_tls(self, ip: str, port: int) -> Dict[str, Any]:
"""
Analyze SSL/TLS configuration including certificate and supported versions
Args:
ip: IP address
port: Port number
Returns:
Dictionary with certificate info and TLS version support
"""
from sslyze import (
Scanner,
ServerScanRequest,
ServerNetworkLocation,
ScanCommand,
ScanCommandAttemptStatusEnum,
ServerScanStatusEnum
)
from cryptography import x509
from datetime import datetime
result = {
'certificate': {},
'tls_versions': {},
'errors': []
}
try:
# Create server location
server_location = ServerNetworkLocation(
hostname=ip,
port=port
)
# Create scan request with all TLS version scans
scan_request = ServerScanRequest(
server_location=server_location,
scan_commands={
ScanCommand.CERTIFICATE_INFO,
ScanCommand.SSL_2_0_CIPHER_SUITES,
ScanCommand.SSL_3_0_CIPHER_SUITES,
ScanCommand.TLS_1_0_CIPHER_SUITES,
ScanCommand.TLS_1_1_CIPHER_SUITES,
ScanCommand.TLS_1_2_CIPHER_SUITES,
ScanCommand.TLS_1_3_CIPHER_SUITES,
}
)
# Run scan
scanner = Scanner()
scanner.queue_scans([scan_request])
# Process results
for scan_result in scanner.get_results():
if scan_result.scan_status != ServerScanStatusEnum.COMPLETED:
result['errors'].append('Connection failed')
return result
server_scan_result = scan_result.scan_result
# Extract certificate information
cert_attempt = getattr(server_scan_result, 'certificate_info', None)
if cert_attempt and cert_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
cert_result = cert_attempt.result
if cert_result.certificate_deployments:
deployment = cert_result.certificate_deployments[0]
leaf_cert = deployment.received_certificate_chain[0]
# Calculate days until expiry
not_after = leaf_cert.not_valid_after_utc
days_until_expiry = (not_after - datetime.now(not_after.tzinfo)).days
# Extract SANs
sans = []
try:
san_ext = leaf_cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
sans = [name.value for name in san_ext.value]
except x509.ExtensionNotFound:
pass
result['certificate'] = {
'subject': leaf_cert.subject.rfc4514_string(),
'issuer': leaf_cert.issuer.rfc4514_string(),
'serial_number': str(leaf_cert.serial_number),
'not_valid_before': leaf_cert.not_valid_before_utc.isoformat(),
'not_valid_after': leaf_cert.not_valid_after_utc.isoformat(),
'days_until_expiry': days_until_expiry,
'sans': sans
}
# Test TLS versions
tls_attributes = {
'TLS 1.0': 'tls_1_0_cipher_suites',
'TLS 1.1': 'tls_1_1_cipher_suites',
'TLS 1.2': 'tls_1_2_cipher_suites',
'TLS 1.3': 'tls_1_3_cipher_suites'
}
for version_name, attr_name in tls_attributes.items():
tls_attempt = getattr(server_scan_result, attr_name, None)
if tls_attempt and tls_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
tls_result = tls_attempt.result
supported = len(tls_result.accepted_cipher_suites) > 0
cipher_suites = [
suite.cipher_suite.name
for suite in tls_result.accepted_cipher_suites
]
result['tls_versions'][version_name] = {
'supported': supported,
'cipher_suites': cipher_suites
}
else:
result['tls_versions'][version_name] = {
'supported': False,
'cipher_suites': []
}
except Exception as e:
result['errors'].append(str(e))
return result
def _run_http_analysis(self, ip_services: Dict[str, List[Dict]]) -> Dict[str, Dict[int, Dict]]:
"""
Analyze HTTP/HTTPS services and SSL/TLS configuration
Args:
ip_services: Dict mapping IP addresses to their service lists
Returns:
Dict mapping IPs to port-specific HTTP analysis results
"""
if not ip_services:
return {}
all_results = {}
for ip, services in ip_services.items():
ip_results = {}
for service in services:
if not self._is_likely_web_service(service):
continue
port = service['port']
print(f" Analyzing {ip}:{port}...", flush=True)
# Detect HTTP vs HTTPS
protocol = self._detect_http_https(ip, port, timeout=5)
if protocol == 'unknown':
continue
result = {'protocol': protocol}
# Capture screenshot if screenshot capture is enabled
if self.screenshot_capture:
try:
screenshot_path = self.screenshot_capture.capture(ip, port, protocol)
if screenshot_path:
result['screenshot'] = screenshot_path
except Exception as e:
print(f" Screenshot capture error for {ip}:{port}: {e}",
file=sys.stderr, flush=True)
# If HTTPS, analyze SSL/TLS
if protocol == 'https':
try:
ssl_info = self._analyze_ssl_tls(ip, port)
# Only include ssl_tls if we got meaningful data
if ssl_info.get('certificate') or ssl_info.get('tls_versions'):
result['ssl_tls'] = ssl_info
elif ssl_info.get('errors'):
# Log errors even if we don't include ssl_tls in output
print(f" SSL/TLS analysis failed for {ip}:{port}: {ssl_info['errors']}",
file=sys.stderr, flush=True)
except Exception as e:
print(f" SSL/TLS analysis error for {ip}:{port}: {e}",
file=sys.stderr, flush=True)
ip_results[port] = result
if ip_results:
all_results[ip] = ip_results
return all_results
def scan(self) -> Dict[str, Any]:
"""
Perform complete scan based on configuration
Returns:
Dictionary containing scan results
"""
print(f"Starting scan: {self.config['title']}", flush=True)
print(f"Config: {self.config_path}", flush=True)
# Record start time
start_time = time.time()
scan_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Initialize screenshot capture
self.screenshot_capture = ScreenshotCapture(
output_dir=str(self.output_dir),
scan_timestamp=scan_timestamp,
timeout=15
)
# Collect all unique IPs
all_ips = set()
ip_to_site = {}
ip_expected = {}
for site in self.config['sites']:
site_name = site['name']
for ip_config in site['ips']:
ip = ip_config['address']
all_ips.add(ip)
ip_to_site[ip] = site_name
ip_expected[ip] = ip_config.get('expected', {})
all_ips = sorted(list(all_ips))
print(f"Total IPs to scan: {len(all_ips)}", flush=True)
# Perform ping scan
print(f"\n[1/5] Performing ping scan on {len(all_ips)} IPs...", flush=True)
ping_results = self._run_ping_scan(all_ips)
# Perform TCP scan (all ports)
print(f"\n[2/5] Performing TCP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True)
tcp_results = self._run_masscan(all_ips, '0-65535', 'tcp')
# Perform UDP scan (all ports)
print(f"\n[3/5] Performing UDP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True)
udp_results = self._run_masscan(all_ips, '0-65535', 'udp')
# Organize results by IP
results_by_ip = {}
for ip in all_ips:
results_by_ip[ip] = {
'site': ip_to_site[ip],
'expected': ip_expected[ip],
'actual': {
'ping': ping_results.get(ip, False),
'tcp_ports': [],
'udp_ports': [],
'services': []
}
}
# Add TCP ports
for result in tcp_results:
ip = result.get('ip')
port = result.get('ports', [{}])[0].get('port')
if ip in results_by_ip and port:
results_by_ip[ip]['actual']['tcp_ports'].append(port)
# Add UDP ports
for result in udp_results:
ip = result.get('ip')
port = result.get('ports', [{}])[0].get('port')
if ip in results_by_ip and port:
results_by_ip[ip]['actual']['udp_ports'].append(port)
# Sort ports
for ip in results_by_ip:
results_by_ip[ip]['actual']['tcp_ports'].sort()
results_by_ip[ip]['actual']['udp_ports'].sort()
# Perform service detection on TCP ports
print(f"\n[4/5] Performing service detection on discovered TCP ports...", flush=True)
ip_ports = {ip: results_by_ip[ip]['actual']['tcp_ports'] for ip in all_ips}
service_results = self._run_nmap_service_detection(ip_ports)
# Add service information to results
for ip, services in service_results.items():
if ip in results_by_ip:
results_by_ip[ip]['actual']['services'] = services
# Perform HTTP/HTTPS analysis on web services
print(f"\n[5/5] Analyzing HTTP/HTTPS services and SSL/TLS configuration...", flush=True)
http_results = self._run_http_analysis(service_results)
# Merge HTTP analysis into service results
for ip, port_results in http_results.items():
if ip in results_by_ip:
for service in results_by_ip[ip]['actual']['services']:
port = service['port']
if port in port_results:
service['http_info'] = port_results[port]
# Calculate scan duration
end_time = time.time()
scan_duration = round(end_time - start_time, 2)
# Build final report
report = {
'title': self.config['title'],
'scan_time': datetime.utcnow().isoformat() + 'Z',
'scan_duration': scan_duration,
'config_file': str(self.config_path),
'sites': []
}
for site in self.config['sites']:
site_result = {
'name': site['name'],
'ips': []
}
for ip_config in site['ips']:
ip = ip_config['address']
site_result['ips'].append({
'address': ip,
'expected': ip_expected[ip],
'actual': results_by_ip[ip]['actual']
})
report['sites'].append(site_result)
# Clean up screenshot capture browser
if self.screenshot_capture:
self.screenshot_capture._close_browser()
return report, scan_timestamp
def save_report(self, report: Dict[str, Any], scan_timestamp: str) -> Path:
"""Save scan report to JSON file using provided timestamp"""
output_file = self.output_dir / f"scan_report_{scan_timestamp}.json"
with open(output_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"\nReport saved to: {output_file}", flush=True)
return output_file
def generate_outputs(self, report: Dict[str, Any], scan_timestamp: str) -> Dict[str, Path]:
"""
Generate all output formats: JSON, HTML report, and ZIP archive
Args:
report: Scan report dictionary
scan_timestamp: Timestamp string in format YYYYMMDD_HHMMSS
Returns:
Dictionary with paths to generated files: {'json': Path, 'html': Path, 'zip': Path}
"""
output_paths = {}
# Step 1: Save JSON report
print("\n" + "="*60, flush=True)
print("Generating outputs...", flush=True)
print("="*60, flush=True)
json_path = self.save_report(report, scan_timestamp)
output_paths['json'] = json_path
# Step 2: Generate HTML report
html_path = self.output_dir / f"scan_report_{scan_timestamp}.html"
try:
print(f"\nGenerating HTML report...", flush=True)
# Auto-detect template directory relative to this script
template_dir = Path(__file__).parent.parent / 'templates'
# Create HTML report generator
generator = HTMLReportGenerator(
json_report_path=str(json_path),
template_dir=str(template_dir)
)
# Generate report
html_result = generator.generate_report(output_path=str(html_path))
output_paths['html'] = Path(html_result)
print(f"HTML report saved to: {html_path}", flush=True)
except Exception as e:
print(f"Warning: HTML report generation failed: {e}", file=sys.stderr, flush=True)
print(f"Continuing with JSON output only...", file=sys.stderr, flush=True)
# Don't add html_path to output_paths if it failed
# Step 3: Create ZIP archive
zip_path = self.output_dir / f"scan_report_{scan_timestamp}.zip"
try:
print(f"\nCreating ZIP archive...", flush=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add JSON report
zipf.write(json_path, json_path.name)
# Add HTML report if it was generated
if 'html' in output_paths and html_path.exists():
zipf.write(html_path, html_path.name)
# Add screenshots directory if it exists
screenshot_dir = self.output_dir / f"scan_report_{scan_timestamp}_screenshots"
if screenshot_dir.exists() and screenshot_dir.is_dir():
# Add all files in screenshot directory
for screenshot_file in screenshot_dir.iterdir():
if screenshot_file.is_file():
# Preserve directory structure in ZIP
arcname = f"{screenshot_dir.name}/{screenshot_file.name}"
zipf.write(screenshot_file, arcname)
output_paths['zip'] = zip_path
print(f"ZIP archive saved to: {zip_path}", flush=True)
except Exception as e:
print(f"Warning: ZIP archive creation failed: {e}", file=sys.stderr, flush=True)
# Don't add zip_path to output_paths if it failed
return output_paths
def main():
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
parser = argparse.ArgumentParser(
description='SneakyScanner - Masscan-based network scanner'
)
parser.add_argument(
'config',
help='Path to YAML configuration file'
)
parser.add_argument(
'-o', '--output-dir',
default='/app/output',
help='Output directory for scan results (default: /app/output)'
)
args = parser.parse_args()
try:
scanner = SneakyScanner(args.config, args.output_dir)
report, scan_timestamp = scanner.scan()
output_paths = scanner.generate_outputs(report, scan_timestamp)
print("\n" + "="*60, flush=True)
print("Scan completed successfully!", flush=True)
print("="*60, flush=True)
print(f" JSON Report: {output_paths.get('json', 'N/A')}", flush=True)
print(f" HTML Report: {output_paths.get('html', 'N/A')}", flush=True)
print(f" ZIP Archive: {output_paths.get('zip', 'N/A')}", flush=True)
print("="*60, flush=True)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr, flush=True)
return 1
if __name__ == '__main__':
sys.exit(main())