commit 48755a853900e76d9b68a37384ed067dd477964d Author: Phillip Tarrant Date: Thu Nov 13 15:23:41 2025 +0000 init commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b37bb68 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Output files +output/*.json + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +#AI helpers +.claude/ +CLAUDE.md + +# OS +.DS_Store +Thumbs.db + +# Docker +.dockerignore diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1f4fcad --- /dev/null +++ b/Dockerfile @@ -0,0 +1,41 @@ +FROM python:3.11-slim + +# Install system dependencies and masscan +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + git \ + build-essential \ + libpcap-dev \ + nmap \ + && rm -rf /var/lib/apt/lists/* + +# Build and install masscan from source +RUN git clone https://github.com/robertdavidgraham/masscan /tmp/masscan && \ + cd /tmp/masscan && \ + make && \ + make install && \ + cd / && \ + rm -rf /tmp/masscan + +# Set working directory +WORKDIR /app + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY src/ ./src/ + +# Create output directory +RUN mkdir -p /app/output + +# Make scanner executable +RUN chmod +x /app/src/scanner.py + +# Force Python unbuffered output +ENV PYTHONUNBUFFERED=1 + +# Set entry point with unbuffered Python +ENTRYPOINT ["python3", "-u", "/app/src/scanner.py"] +CMD ["--help"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..0450121 --- /dev/null +++ b/README.md @@ -0,0 +1,245 @@ +# SneakyScanner + +A dockerized network scanning tool that uses masscan for fast port discovery and nmap for service detection to perform comprehensive infrastructure audits. SneakyScanner accepts YAML-based configuration files to define sites, IPs, and expected network behavior, then generates machine-readable JSON reports with detailed service information. + +## Features + +- YAML-based configuration for defining scan targets and expectations +- Comprehensive scanning using masscan: + - Ping/ICMP echo detection + - TCP port scanning (all 65535 ports) + - UDP port scanning (all 65535 ports) +- Service detection using nmap: + - Identifies services running on discovered TCP ports + - Extracts product names and versions + - Provides detailed service information +- HTTP/HTTPS analysis and SSL/TLS security assessment: + - Detects HTTP vs HTTPS on web services + - Extracts SSL certificate details (subject, issuer, expiration, SANs) + - Calculates days until certificate expiration + - Tests TLS version support (TLS 1.0, 1.1, 1.2, 1.3) + - Lists accepted cipher suites for each TLS version +- JSON output format for easy post-processing +- Dockerized for consistent execution environment and root privilege isolation +- Compare actual vs. expected network behavior + +## Requirements + +- Docker +- Docker Compose (optional, for easier usage) + +## Quick Start + +### Using Docker Compose + +1. Create or modify a configuration file in `configs/`: + +```yaml +title: "My Infrastructure Scan" +sites: + - name: "Web Servers" + ips: + - address: "192.168.1.10" + expected: + ping: true + tcp_ports: [22, 80, 443] + udp_ports: [] +``` + +2. Build and run: + +```bash +docker-compose build +docker-compose up +``` + +3. Check results in the `output/` directory + +## Scan Performance + +SneakyScanner uses a five-phase approach for comprehensive scanning: + +1. **Ping Scan** (masscan): ICMP echo detection - ~1-2 seconds +2. **TCP Port Discovery** (masscan): Scans all 65535 TCP ports at 10,000 packets/second - ~13 seconds per 2 IPs +3. **UDP Port Discovery** (masscan): Scans all 65535 UDP ports at 10,000 packets/second - ~13 seconds per 2 IPs +4. **Service Detection** (nmap): Identifies services on discovered TCP ports - ~20-60 seconds per IP with open ports +5. **HTTP/HTTPS Analysis** (SSL/TLS): Detects web protocols and analyzes certificates - ~5-10 seconds per web service + +**Example**: Scanning 2 IPs with 10 open ports each (including 2-3 web services) typically takes 1.5-2.5 minutes total. + +### Using Docker Directly + +1. Build the image: + +```bash +docker build -t sneakyscanner . +``` + +2. Run a scan: + +```bash +docker run --rm --privileged --network host \ + -v $(pwd)/configs:/app/configs:ro \ + -v $(pwd)/output:/app/output \ + sneakyscanner /app/configs/your-config.yaml +``` + +## Configuration File Format + +The YAML configuration file defines the scan parameters: + +```yaml +title: "Scan Title" # Required: Report title +sites: # Required: List of sites to scan + - name: "Site Name" + ips: + - address: "192.168.1.10" + expected: + ping: true # Expected ping response + tcp_ports: [22, 80] # Expected TCP ports + udp_ports: [53] # Expected UDP ports +``` + +See `configs/example-site.yaml` for a complete example. + +## Output Format + +Scan results are saved as JSON files in the `output/` directory with timestamps. The report includes the total scan duration (in seconds) covering all phases: ping scan, TCP/UDP port discovery, and service detection. + +```json +{ + "title": "Sneaky Infra Scan", + "scan_time": "2024-01-15T10:30:00Z", + "scan_duration": 95.3, + "config_file": "/app/configs/example-site.yaml", + "sites": [ + { + "name": "Production Web Servers", + "ips": [ + { + "address": "192.168.1.10", + "expected": { + "ping": true, + "tcp_ports": [22, 80, 443], + "udp_ports": [53] + }, + "actual": { + "ping": true, + "tcp_ports": [22, 80, 443, 3000], + "udp_ports": [53], + "services": [ + { + "port": 22, + "protocol": "tcp", + "service": "ssh", + "product": "OpenSSH", + "version": "8.2p1" + }, + { + "port": 80, + "protocol": "tcp", + "service": "http", + "product": "nginx", + "version": "1.18.0", + "http_info": { + "protocol": "http" + } + }, + { + "port": 443, + "protocol": "tcp", + "service": "https", + "product": "nginx", + "http_info": { + "protocol": "https", + "ssl_tls": { + "certificate": { + "subject": "CN=example.com", + "issuer": "CN=Let's Encrypt Authority X3,O=Let's Encrypt,C=US", + "serial_number": "123456789012345678901234567890", + "not_valid_before": "2025-01-01T00:00:00+00:00", + "not_valid_after": "2025-04-01T23:59:59+00:00", + "days_until_expiry": 89, + "sans": ["example.com", "www.example.com"] + }, + "tls_versions": { + "TLS 1.0": { + "supported": false, + "cipher_suites": [] + }, + "TLS 1.1": { + "supported": false, + "cipher_suites": [] + }, + "TLS 1.2": { + "supported": true, + "cipher_suites": [ + "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" + ] + }, + "TLS 1.3": { + "supported": true, + "cipher_suites": [ + "TLS_AES_256_GCM_SHA384", + "TLS_AES_128_GCM_SHA256" + ] + } + } + } + } + }, + { + "port": 3000, + "protocol": "tcp", + "service": "http", + "product": "Node.js", + "http_info": { + "protocol": "http" + } + } + ] + } + } + ] + } + ] +} +``` + +## Project Structure + +``` +SneakyScanner/ +├── src/ +│ └── scanner.py # Main scanner application +├── configs/ +│ └── example-site.yaml # Example configuration +├── output/ # Scan results (JSON files) +├── Dockerfile +├── docker-compose.yml +├── requirements.txt +└── README.md +``` + +## Security Notice + +This tool requires: +- `--privileged` flag or `CAP_NET_RAW` capability for masscan and nmap raw socket access +- `--network host` for direct network access + +Only use this tool on networks you own or have explicit authorization to scan. Unauthorized network scanning may be illegal in your jurisdiction. + +## Future Enhancements + +- **Webpage Screenshots**: Capture screenshots of discovered web services for visual verification +- **HTML Report Generation**: Build comprehensive HTML reports from JSON output with: + - Service details and SSL/TLS information + - Visual comparison of expected vs. actual results + - Certificate expiration warnings + - TLS version compliance reports + - Embedded webpage screenshots +- **Comparison Reports**: Generate diff reports showing changes between scans +- **Email Notifications**: Alert on unexpected changes or certificate expirations +- **Scheduled Scanning**: Automated periodic scans with cron integration +- **Vulnerability Detection**: Integration with CVE databases for known vulnerabilities diff --git a/configs/example-site.yaml b/configs/example-site.yaml new file mode 100644 index 0000000..21a6869 --- /dev/null +++ b/configs/example-site.yaml @@ -0,0 +1,16 @@ +title: "Sneaky Infra Scan" +sites: + - name: "Production Web Servers" + ips: + - address: "10.10.20.4" + expected: + ping: true + tcp_ports: [22, 53, 80] + udp_ports: [53] + # Optional: specify expected services (detected automatically) + services: ["ssh", "domain", "http"] + - address: "10.10.20.11" + expected: + ping: true + tcp_ports: [22, 111, 3128, 8006] + udp_ports: [] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9033646 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3.8' + +services: + scanner: + build: . + image: sneakyscanner:latest + container_name: sneakyscanner + privileged: true # Required for masscan raw socket access + network_mode: host # Required for network scanning + volumes: + - ./configs:/app/configs:ro + - ./output:/app/output + command: /app/configs/example-site.yaml diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88ede05 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +PyYAML==6.0.1 +python-libnmap==0.7.3 +sslyze==6.0.0 diff --git a/src/scanner.py b/src/scanner.py new file mode 100644 index 0000000..2cb9d04 --- /dev/null +++ b/src/scanner.py @@ -0,0 +1,709 @@ +#!/usr/bin/env python3 +""" +SneakyScanner - Masscan-based network scanner with YAML configuration +""" + +import argparse +import json +import subprocess +import sys +import tempfile +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Any +import xml.etree.ElementTree as ET + +import yaml +from libnmap.process import NmapProcess +from libnmap.parser import NmapParser + +# Force unbuffered output for Docker +sys.stdout.reconfigure(line_buffering=True) +sys.stderr.reconfigure(line_buffering=True) + + +class SneakyScanner: + """Wrapper for masscan to perform network scans based on YAML config""" + + def __init__(self, config_path: str, output_dir: str = "/app/output"): + self.config_path = Path(config_path) + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """Load and validate YAML configuration""" + if not self.config_path.exists(): + raise FileNotFoundError(f"Config file not found: {self.config_path}") + + with open(self.config_path, 'r') as f: + config = yaml.safe_load(f) + + if not config.get('title'): + raise ValueError("Config must include 'title' field") + if not config.get('sites'): + raise ValueError("Config must include 'sites' field") + + return config + + def _run_masscan(self, targets: List[str], ports: str, protocol: str) -> List[Dict]: + """ + Run masscan and return parsed results + + Args: + targets: List of IP addresses to scan + ports: Port range string (e.g., "0-65535") + protocol: "tcp" or "udp" + """ + if not targets: + return [] + + # Create temporary file for targets + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: + f.write('\n'.join(targets)) + target_file = f.name + + # Create temporary output file + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: + output_file = f.name + + try: + # Build command based on protocol + if protocol == 'tcp': + cmd = [ + 'masscan', + '-iL', target_file, + '-p', ports, + '--rate', '10000', + '-oJ', output_file, + '--wait', '0' + ] + elif protocol == 'udp': + cmd = [ + 'masscan', + '-iL', target_file, + '--udp-ports', ports, + '--rate', '10000', + '-oJ', output_file, + '--wait', '0' + ] + else: + raise ValueError(f"Invalid protocol: {protocol}") + + print(f"Running: {' '.join(cmd)}", flush=True) + result = subprocess.run(cmd, capture_output=True, text=True) + print(f"Masscan {protocol.upper()} scan completed", flush=True) + + if result.returncode != 0: + print(f"Masscan stderr: {result.stderr}", file=sys.stderr) + + # Parse masscan JSON output + results = [] + with open(output_file, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + try: + results.append(json.loads(line.rstrip(','))) + except json.JSONDecodeError: + continue + + return results + + finally: + # Cleanup temp files + Path(target_file).unlink(missing_ok=True) + Path(output_file).unlink(missing_ok=True) + + def _run_ping_scan(self, targets: List[str]) -> Dict[str, bool]: + """ + Run ping scan using masscan ICMP echo + + Returns: + Dict mapping IP addresses to ping response status + """ + if not targets: + return {} + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: + f.write('\n'.join(targets)) + target_file = f.name + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f: + output_file = f.name + + try: + cmd = [ + 'masscan', + '-iL', target_file, + '--ping', + '--rate', '10000', + '-oJ', output_file, + '--wait', '0' + ] + + print(f"Running: {' '.join(cmd)}", flush=True) + result = subprocess.run(cmd, capture_output=True, text=True) + print(f"Masscan PING scan completed", flush=True) + + if result.returncode != 0: + print(f"Masscan stderr: {result.stderr}", file=sys.stderr, flush=True) + + # Parse results + responding_ips = set() + with open(output_file, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + try: + data = json.loads(line.rstrip(',')) + if 'ip' in data: + responding_ips.add(data['ip']) + except json.JSONDecodeError: + continue + + # Create result dict for all targets + return {ip: (ip in responding_ips) for ip in targets} + + finally: + Path(target_file).unlink(missing_ok=True) + Path(output_file).unlink(missing_ok=True) + + def _run_nmap_service_detection(self, ip_ports: Dict[str, List[int]]) -> Dict[str, List[Dict]]: + """ + Run nmap service detection on discovered ports + + Args: + ip_ports: Dict mapping IP addresses to list of TCP ports + + Returns: + Dict mapping IP addresses to list of service info dicts + """ + if not ip_ports: + return {} + + all_services = {} + + for ip, ports in ip_ports.items(): + if not ports: + all_services[ip] = [] + continue + + # Build port list string + port_list = ','.join(map(str, sorted(ports))) + + print(f" Scanning {ip} ports {port_list}...", flush=True) + + # Create temporary output file for XML + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xml') as f: + xml_output = f.name + + try: + # Run nmap with service detection + cmd = [ + 'nmap', + '-sV', # Service version detection + '--version-intensity', '5', # Balanced speed/accuracy + '-p', port_list, + '-oX', xml_output, # XML output + '--host-timeout', '5m', # Timeout per host + ip + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) + + if result.returncode != 0: + print(f" Nmap warning for {ip}: {result.stderr}", file=sys.stderr, flush=True) + + # Parse XML output + services = self._parse_nmap_xml(xml_output) + all_services[ip] = services + + except subprocess.TimeoutExpired: + print(f" Nmap timeout for {ip}, skipping service detection", file=sys.stderr, flush=True) + all_services[ip] = [] + except Exception as e: + print(f" Nmap error for {ip}: {e}", file=sys.stderr, flush=True) + all_services[ip] = [] + finally: + Path(xml_output).unlink(missing_ok=True) + + return all_services + + def _parse_nmap_xml(self, xml_file: str) -> List[Dict]: + """ + Parse nmap XML output to extract service information + + Args: + xml_file: Path to nmap XML output file + + Returns: + List of service info dictionaries + """ + services = [] + + try: + tree = ET.parse(xml_file) + root = tree.getroot() + + # Find all ports + for port_elem in root.findall('.//port'): + port_id = port_elem.get('portid') + protocol = port_elem.get('protocol', 'tcp') + + # Get state + state_elem = port_elem.find('state') + if state_elem is None or state_elem.get('state') != 'open': + continue + + # Get service info + service_elem = port_elem.find('service') + if service_elem is not None: + service_info = { + 'port': int(port_id), + 'protocol': protocol, + 'service': service_elem.get('name', 'unknown'), + 'product': service_elem.get('product', ''), + 'version': service_elem.get('version', ''), + 'extrainfo': service_elem.get('extrainfo', ''), + 'ostype': service_elem.get('ostype', '') + } + + # Clean up empty fields + service_info = {k: v for k, v in service_info.items() if v} + + services.append(service_info) + else: + # Port is open but no service info + services.append({ + 'port': int(port_id), + 'protocol': protocol, + 'service': 'unknown' + }) + + except Exception as e: + print(f" Error parsing nmap XML: {e}", file=sys.stderr, flush=True) + + return services + + def _is_likely_web_service(self, service: Dict) -> bool: + """ + Check if a service is likely HTTP/HTTPS based on nmap detection or common web ports + + Args: + service: Service dictionary from nmap results + + Returns: + True if service appears to be web-related + """ + # Check service name + web_services = ['http', 'https', 'ssl', 'http-proxy', 'https-alt', + 'http-alt', 'ssl/http', 'ssl/https'] + service_name = service.get('service', '').lower() + + if service_name in web_services: + return True + + # Check common non-standard web ports + web_ports = [80, 443, 8000, 8006, 8008, 8080, 8081, 8443, 8888, 9443] + port = service.get('port') + + return port in web_ports + + def _detect_http_https(self, ip: str, port: int, timeout: int = 5) -> str: + """ + Detect if a port is HTTP or HTTPS + + Args: + ip: IP address + port: Port number + timeout: Connection timeout in seconds + + Returns: + 'http', 'https', or 'unknown' + """ + import socket + import ssl as ssl_module + + # Try HTTPS first + try: + context = ssl_module.create_default_context() + context.check_hostname = False + context.verify_mode = ssl_module.CERT_NONE + + with socket.create_connection((ip, port), timeout=timeout) as sock: + with context.wrap_socket(sock, server_hostname=ip) as ssock: + return 'https' + except ssl_module.SSLError: + # Not HTTPS, try HTTP + pass + except (socket.timeout, socket.error, ConnectionRefusedError): + return 'unknown' + + # Try HTTP + try: + with socket.create_connection((ip, port), timeout=timeout) as sock: + sock.send(b'HEAD / HTTP/1.0\r\n\r\n') + response = sock.recv(1024) + if b'HTTP' in response: + return 'http' + except (socket.timeout, socket.error, ConnectionRefusedError): + pass + + return 'unknown' + + def _analyze_ssl_tls(self, ip: str, port: int) -> Dict[str, Any]: + """ + Analyze SSL/TLS configuration including certificate and supported versions + + Args: + ip: IP address + port: Port number + + Returns: + Dictionary with certificate info and TLS version support + """ + from sslyze import ( + Scanner, + ServerScanRequest, + ServerNetworkLocation, + ScanCommand, + ScanCommandAttemptStatusEnum, + ServerScanStatusEnum + ) + from cryptography import x509 + from datetime import datetime + + result = { + 'certificate': {}, + 'tls_versions': {}, + 'errors': [] + } + + try: + # Create server location + server_location = ServerNetworkLocation( + hostname=ip, + port=port + ) + + # Create scan request with all TLS version scans + scan_request = ServerScanRequest( + server_location=server_location, + scan_commands={ + ScanCommand.CERTIFICATE_INFO, + ScanCommand.SSL_2_0_CIPHER_SUITES, + ScanCommand.SSL_3_0_CIPHER_SUITES, + ScanCommand.TLS_1_0_CIPHER_SUITES, + ScanCommand.TLS_1_1_CIPHER_SUITES, + ScanCommand.TLS_1_2_CIPHER_SUITES, + ScanCommand.TLS_1_3_CIPHER_SUITES, + } + ) + + # Run scan + scanner = Scanner() + scanner.queue_scans([scan_request]) + + # Process results + for scan_result in scanner.get_results(): + if scan_result.scan_status != ServerScanStatusEnum.COMPLETED: + result['errors'].append('Connection failed') + return result + + server_scan_result = scan_result.scan_result + + # Extract certificate information + cert_attempt = getattr(server_scan_result, 'certificate_info', None) + if cert_attempt and cert_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED: + cert_result = cert_attempt.result + if cert_result.certificate_deployments: + deployment = cert_result.certificate_deployments[0] + leaf_cert = deployment.received_certificate_chain[0] + + # Calculate days until expiry + not_after = leaf_cert.not_valid_after_utc + days_until_expiry = (not_after - datetime.now(not_after.tzinfo)).days + + # Extract SANs + sans = [] + try: + san_ext = leaf_cert.extensions.get_extension_for_class( + x509.SubjectAlternativeName + ) + sans = [name.value for name in san_ext.value] + except x509.ExtensionNotFound: + pass + + result['certificate'] = { + 'subject': leaf_cert.subject.rfc4514_string(), + 'issuer': leaf_cert.issuer.rfc4514_string(), + 'serial_number': str(leaf_cert.serial_number), + 'not_valid_before': leaf_cert.not_valid_before_utc.isoformat(), + 'not_valid_after': leaf_cert.not_valid_after_utc.isoformat(), + 'days_until_expiry': days_until_expiry, + 'sans': sans + } + + # Test TLS versions + tls_attributes = { + 'TLS 1.0': 'tls_1_0_cipher_suites', + 'TLS 1.1': 'tls_1_1_cipher_suites', + 'TLS 1.2': 'tls_1_2_cipher_suites', + 'TLS 1.3': 'tls_1_3_cipher_suites' + } + + for version_name, attr_name in tls_attributes.items(): + tls_attempt = getattr(server_scan_result, attr_name, None) + if tls_attempt and tls_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED: + tls_result = tls_attempt.result + supported = len(tls_result.accepted_cipher_suites) > 0 + cipher_suites = [ + suite.cipher_suite.name + for suite in tls_result.accepted_cipher_suites + ] + result['tls_versions'][version_name] = { + 'supported': supported, + 'cipher_suites': cipher_suites + } + else: + result['tls_versions'][version_name] = { + 'supported': False, + 'cipher_suites': [] + } + + except Exception as e: + result['errors'].append(str(e)) + + return result + + def _run_http_analysis(self, ip_services: Dict[str, List[Dict]]) -> Dict[str, Dict[int, Dict]]: + """ + Analyze HTTP/HTTPS services and SSL/TLS configuration + + Args: + ip_services: Dict mapping IP addresses to their service lists + + Returns: + Dict mapping IPs to port-specific HTTP analysis results + """ + if not ip_services: + return {} + + all_results = {} + + for ip, services in ip_services.items(): + ip_results = {} + + for service in services: + if not self._is_likely_web_service(service): + continue + + port = service['port'] + print(f" Analyzing {ip}:{port}...", flush=True) + + # Detect HTTP vs HTTPS + protocol = self._detect_http_https(ip, port, timeout=5) + + if protocol == 'unknown': + continue + + result = {'protocol': protocol} + + # If HTTPS, analyze SSL/TLS + if protocol == 'https': + try: + ssl_info = self._analyze_ssl_tls(ip, port) + # Only include ssl_tls if we got meaningful data + if ssl_info.get('certificate') or ssl_info.get('tls_versions'): + result['ssl_tls'] = ssl_info + elif ssl_info.get('errors'): + # Log errors even if we don't include ssl_tls in output + print(f" SSL/TLS analysis failed for {ip}:{port}: {ssl_info['errors']}", + file=sys.stderr, flush=True) + except Exception as e: + print(f" SSL/TLS analysis error for {ip}:{port}: {e}", + file=sys.stderr, flush=True) + + ip_results[port] = result + + if ip_results: + all_results[ip] = ip_results + + return all_results + + def scan(self) -> Dict[str, Any]: + """ + Perform complete scan based on configuration + + Returns: + Dictionary containing scan results + """ + print(f"Starting scan: {self.config['title']}", flush=True) + print(f"Config: {self.config_path}", flush=True) + + # Record start time + start_time = time.time() + + # Collect all unique IPs + all_ips = set() + ip_to_site = {} + ip_expected = {} + + for site in self.config['sites']: + site_name = site['name'] + for ip_config in site['ips']: + ip = ip_config['address'] + all_ips.add(ip) + ip_to_site[ip] = site_name + ip_expected[ip] = ip_config.get('expected', {}) + + all_ips = sorted(list(all_ips)) + print(f"Total IPs to scan: {len(all_ips)}", flush=True) + + # Perform ping scan + print(f"\n[1/5] Performing ping scan on {len(all_ips)} IPs...", flush=True) + ping_results = self._run_ping_scan(all_ips) + + # Perform TCP scan (all ports) + print(f"\n[2/5] Performing TCP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True) + tcp_results = self._run_masscan(all_ips, '0-65535', 'tcp') + + # Perform UDP scan (all ports) + print(f"\n[3/5] Performing UDP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True) + udp_results = self._run_masscan(all_ips, '0-65535', 'udp') + + # Organize results by IP + results_by_ip = {} + for ip in all_ips: + results_by_ip[ip] = { + 'site': ip_to_site[ip], + 'expected': ip_expected[ip], + 'actual': { + 'ping': ping_results.get(ip, False), + 'tcp_ports': [], + 'udp_ports': [], + 'services': [] + } + } + + # Add TCP ports + for result in tcp_results: + ip = result.get('ip') + port = result.get('ports', [{}])[0].get('port') + if ip in results_by_ip and port: + results_by_ip[ip]['actual']['tcp_ports'].append(port) + + # Add UDP ports + for result in udp_results: + ip = result.get('ip') + port = result.get('ports', [{}])[0].get('port') + if ip in results_by_ip and port: + results_by_ip[ip]['actual']['udp_ports'].append(port) + + # Sort ports + for ip in results_by_ip: + results_by_ip[ip]['actual']['tcp_ports'].sort() + results_by_ip[ip]['actual']['udp_ports'].sort() + + # Perform service detection on TCP ports + print(f"\n[4/5] Performing service detection on discovered TCP ports...", flush=True) + ip_ports = {ip: results_by_ip[ip]['actual']['tcp_ports'] for ip in all_ips} + service_results = self._run_nmap_service_detection(ip_ports) + + # Add service information to results + for ip, services in service_results.items(): + if ip in results_by_ip: + results_by_ip[ip]['actual']['services'] = services + + # Perform HTTP/HTTPS analysis on web services + print(f"\n[5/5] Analyzing HTTP/HTTPS services and SSL/TLS configuration...", flush=True) + http_results = self._run_http_analysis(service_results) + + # Merge HTTP analysis into service results + for ip, port_results in http_results.items(): + if ip in results_by_ip: + for service in results_by_ip[ip]['actual']['services']: + port = service['port'] + if port in port_results: + service['http_info'] = port_results[port] + + # Calculate scan duration + end_time = time.time() + scan_duration = round(end_time - start_time, 2) + + # Build final report + report = { + 'title': self.config['title'], + 'scan_time': datetime.utcnow().isoformat() + 'Z', + 'scan_duration': scan_duration, + 'config_file': str(self.config_path), + 'sites': [] + } + + for site in self.config['sites']: + site_result = { + 'name': site['name'], + 'ips': [] + } + + for ip_config in site['ips']: + ip = ip_config['address'] + site_result['ips'].append({ + 'address': ip, + 'expected': ip_expected[ip], + 'actual': results_by_ip[ip]['actual'] + }) + + report['sites'].append(site_result) + + return report + + def save_report(self, report: Dict[str, Any]) -> Path: + """Save scan report to JSON file""" + timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S') + output_file = self.output_dir / f"scan_report_{timestamp}.json" + + with open(output_file, 'w') as f: + json.dump(report, f, indent=2) + + print(f"\nReport saved to: {output_file}", flush=True) + return output_file + + +def main(): + parser = argparse.ArgumentParser( + description='SneakyScanner - Masscan-based network scanner' + ) + parser.add_argument( + 'config', + help='Path to YAML configuration file' + ) + parser.add_argument( + '-o', '--output-dir', + default='/app/output', + help='Output directory for scan results (default: /app/output)' + ) + + args = parser.parse_args() + + try: + scanner = SneakyScanner(args.config, args.output_dir) + report = scanner.scan() + output_file = scanner.save_report(report) + + print("\n" + "="*60, flush=True) + print("Scan completed successfully!", flush=True) + print(f"Results: {output_file}", flush=True) + print("="*60, flush=True) + + return 0 + + except Exception as e: + print(f"Error: {e}", file=sys.stderr, flush=True) + return 1 + + +if __name__ == '__main__': + sys.exit(main())