init commit
This commit is contained in:
709
src/scanner.py
Normal file
709
src/scanner.py
Normal file
@@ -0,0 +1,709 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
SneakyScanner - Masscan-based network scanner with YAML configuration
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
import yaml
|
||||
from libnmap.process import NmapProcess
|
||||
from libnmap.parser import NmapParser
|
||||
|
||||
# Force unbuffered output for Docker
|
||||
sys.stdout.reconfigure(line_buffering=True)
|
||||
sys.stderr.reconfigure(line_buffering=True)
|
||||
|
||||
|
||||
class SneakyScanner:
|
||||
"""Wrapper for masscan to perform network scans based on YAML config"""
|
||||
|
||||
def __init__(self, config_path: str, output_dir: str = "/app/output"):
|
||||
self.config_path = Path(config_path)
|
||||
self.output_dir = Path(output_dir)
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.config = self._load_config()
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Load and validate YAML configuration"""
|
||||
if not self.config_path.exists():
|
||||
raise FileNotFoundError(f"Config file not found: {self.config_path}")
|
||||
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
if not config.get('title'):
|
||||
raise ValueError("Config must include 'title' field")
|
||||
if not config.get('sites'):
|
||||
raise ValueError("Config must include 'sites' field")
|
||||
|
||||
return config
|
||||
|
||||
def _run_masscan(self, targets: List[str], ports: str, protocol: str) -> List[Dict]:
|
||||
"""
|
||||
Run masscan and return parsed results
|
||||
|
||||
Args:
|
||||
targets: List of IP addresses to scan
|
||||
ports: Port range string (e.g., "0-65535")
|
||||
protocol: "tcp" or "udp"
|
||||
"""
|
||||
if not targets:
|
||||
return []
|
||||
|
||||
# Create temporary file for targets
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write('\n'.join(targets))
|
||||
target_file = f.name
|
||||
|
||||
# Create temporary output file
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
|
||||
output_file = f.name
|
||||
|
||||
try:
|
||||
# Build command based on protocol
|
||||
if protocol == 'tcp':
|
||||
cmd = [
|
||||
'masscan',
|
||||
'-iL', target_file,
|
||||
'-p', ports,
|
||||
'--rate', '10000',
|
||||
'-oJ', output_file,
|
||||
'--wait', '0'
|
||||
]
|
||||
elif protocol == 'udp':
|
||||
cmd = [
|
||||
'masscan',
|
||||
'-iL', target_file,
|
||||
'--udp-ports', ports,
|
||||
'--rate', '10000',
|
||||
'-oJ', output_file,
|
||||
'--wait', '0'
|
||||
]
|
||||
else:
|
||||
raise ValueError(f"Invalid protocol: {protocol}")
|
||||
|
||||
print(f"Running: {' '.join(cmd)}", flush=True)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
print(f"Masscan {protocol.upper()} scan completed", flush=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"Masscan stderr: {result.stderr}", file=sys.stderr)
|
||||
|
||||
# Parse masscan JSON output
|
||||
results = []
|
||||
with open(output_file, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
try:
|
||||
results.append(json.loads(line.rstrip(',')))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return results
|
||||
|
||||
finally:
|
||||
# Cleanup temp files
|
||||
Path(target_file).unlink(missing_ok=True)
|
||||
Path(output_file).unlink(missing_ok=True)
|
||||
|
||||
def _run_ping_scan(self, targets: List[str]) -> Dict[str, bool]:
|
||||
"""
|
||||
Run ping scan using masscan ICMP echo
|
||||
|
||||
Returns:
|
||||
Dict mapping IP addresses to ping response status
|
||||
"""
|
||||
if not targets:
|
||||
return {}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write('\n'.join(targets))
|
||||
target_file = f.name
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
|
||||
output_file = f.name
|
||||
|
||||
try:
|
||||
cmd = [
|
||||
'masscan',
|
||||
'-iL', target_file,
|
||||
'--ping',
|
||||
'--rate', '10000',
|
||||
'-oJ', output_file,
|
||||
'--wait', '0'
|
||||
]
|
||||
|
||||
print(f"Running: {' '.join(cmd)}", flush=True)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
print(f"Masscan PING scan completed", flush=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"Masscan stderr: {result.stderr}", file=sys.stderr, flush=True)
|
||||
|
||||
# Parse results
|
||||
responding_ips = set()
|
||||
with open(output_file, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
try:
|
||||
data = json.loads(line.rstrip(','))
|
||||
if 'ip' in data:
|
||||
responding_ips.add(data['ip'])
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Create result dict for all targets
|
||||
return {ip: (ip in responding_ips) for ip in targets}
|
||||
|
||||
finally:
|
||||
Path(target_file).unlink(missing_ok=True)
|
||||
Path(output_file).unlink(missing_ok=True)
|
||||
|
||||
def _run_nmap_service_detection(self, ip_ports: Dict[str, List[int]]) -> Dict[str, List[Dict]]:
|
||||
"""
|
||||
Run nmap service detection on discovered ports
|
||||
|
||||
Args:
|
||||
ip_ports: Dict mapping IP addresses to list of TCP ports
|
||||
|
||||
Returns:
|
||||
Dict mapping IP addresses to list of service info dicts
|
||||
"""
|
||||
if not ip_ports:
|
||||
return {}
|
||||
|
||||
all_services = {}
|
||||
|
||||
for ip, ports in ip_ports.items():
|
||||
if not ports:
|
||||
all_services[ip] = []
|
||||
continue
|
||||
|
||||
# Build port list string
|
||||
port_list = ','.join(map(str, sorted(ports)))
|
||||
|
||||
print(f" Scanning {ip} ports {port_list}...", flush=True)
|
||||
|
||||
# Create temporary output file for XML
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xml') as f:
|
||||
xml_output = f.name
|
||||
|
||||
try:
|
||||
# Run nmap with service detection
|
||||
cmd = [
|
||||
'nmap',
|
||||
'-sV', # Service version detection
|
||||
'--version-intensity', '5', # Balanced speed/accuracy
|
||||
'-p', port_list,
|
||||
'-oX', xml_output, # XML output
|
||||
'--host-timeout', '5m', # Timeout per host
|
||||
ip
|
||||
]
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f" Nmap warning for {ip}: {result.stderr}", file=sys.stderr, flush=True)
|
||||
|
||||
# Parse XML output
|
||||
services = self._parse_nmap_xml(xml_output)
|
||||
all_services[ip] = services
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
print(f" Nmap timeout for {ip}, skipping service detection", file=sys.stderr, flush=True)
|
||||
all_services[ip] = []
|
||||
except Exception as e:
|
||||
print(f" Nmap error for {ip}: {e}", file=sys.stderr, flush=True)
|
||||
all_services[ip] = []
|
||||
finally:
|
||||
Path(xml_output).unlink(missing_ok=True)
|
||||
|
||||
return all_services
|
||||
|
||||
def _parse_nmap_xml(self, xml_file: str) -> List[Dict]:
|
||||
"""
|
||||
Parse nmap XML output to extract service information
|
||||
|
||||
Args:
|
||||
xml_file: Path to nmap XML output file
|
||||
|
||||
Returns:
|
||||
List of service info dictionaries
|
||||
"""
|
||||
services = []
|
||||
|
||||
try:
|
||||
tree = ET.parse(xml_file)
|
||||
root = tree.getroot()
|
||||
|
||||
# Find all ports
|
||||
for port_elem in root.findall('.//port'):
|
||||
port_id = port_elem.get('portid')
|
||||
protocol = port_elem.get('protocol', 'tcp')
|
||||
|
||||
# Get state
|
||||
state_elem = port_elem.find('state')
|
||||
if state_elem is None or state_elem.get('state') != 'open':
|
||||
continue
|
||||
|
||||
# Get service info
|
||||
service_elem = port_elem.find('service')
|
||||
if service_elem is not None:
|
||||
service_info = {
|
||||
'port': int(port_id),
|
||||
'protocol': protocol,
|
||||
'service': service_elem.get('name', 'unknown'),
|
||||
'product': service_elem.get('product', ''),
|
||||
'version': service_elem.get('version', ''),
|
||||
'extrainfo': service_elem.get('extrainfo', ''),
|
||||
'ostype': service_elem.get('ostype', '')
|
||||
}
|
||||
|
||||
# Clean up empty fields
|
||||
service_info = {k: v for k, v in service_info.items() if v}
|
||||
|
||||
services.append(service_info)
|
||||
else:
|
||||
# Port is open but no service info
|
||||
services.append({
|
||||
'port': int(port_id),
|
||||
'protocol': protocol,
|
||||
'service': 'unknown'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f" Error parsing nmap XML: {e}", file=sys.stderr, flush=True)
|
||||
|
||||
return services
|
||||
|
||||
def _is_likely_web_service(self, service: Dict) -> bool:
|
||||
"""
|
||||
Check if a service is likely HTTP/HTTPS based on nmap detection or common web ports
|
||||
|
||||
Args:
|
||||
service: Service dictionary from nmap results
|
||||
|
||||
Returns:
|
||||
True if service appears to be web-related
|
||||
"""
|
||||
# Check service name
|
||||
web_services = ['http', 'https', 'ssl', 'http-proxy', 'https-alt',
|
||||
'http-alt', 'ssl/http', 'ssl/https']
|
||||
service_name = service.get('service', '').lower()
|
||||
|
||||
if service_name in web_services:
|
||||
return True
|
||||
|
||||
# Check common non-standard web ports
|
||||
web_ports = [80, 443, 8000, 8006, 8008, 8080, 8081, 8443, 8888, 9443]
|
||||
port = service.get('port')
|
||||
|
||||
return port in web_ports
|
||||
|
||||
def _detect_http_https(self, ip: str, port: int, timeout: int = 5) -> str:
|
||||
"""
|
||||
Detect if a port is HTTP or HTTPS
|
||||
|
||||
Args:
|
||||
ip: IP address
|
||||
port: Port number
|
||||
timeout: Connection timeout in seconds
|
||||
|
||||
Returns:
|
||||
'http', 'https', or 'unknown'
|
||||
"""
|
||||
import socket
|
||||
import ssl as ssl_module
|
||||
|
||||
# Try HTTPS first
|
||||
try:
|
||||
context = ssl_module.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl_module.CERT_NONE
|
||||
|
||||
with socket.create_connection((ip, port), timeout=timeout) as sock:
|
||||
with context.wrap_socket(sock, server_hostname=ip) as ssock:
|
||||
return 'https'
|
||||
except ssl_module.SSLError:
|
||||
# Not HTTPS, try HTTP
|
||||
pass
|
||||
except (socket.timeout, socket.error, ConnectionRefusedError):
|
||||
return 'unknown'
|
||||
|
||||
# Try HTTP
|
||||
try:
|
||||
with socket.create_connection((ip, port), timeout=timeout) as sock:
|
||||
sock.send(b'HEAD / HTTP/1.0\r\n\r\n')
|
||||
response = sock.recv(1024)
|
||||
if b'HTTP' in response:
|
||||
return 'http'
|
||||
except (socket.timeout, socket.error, ConnectionRefusedError):
|
||||
pass
|
||||
|
||||
return 'unknown'
|
||||
|
||||
def _analyze_ssl_tls(self, ip: str, port: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Analyze SSL/TLS configuration including certificate and supported versions
|
||||
|
||||
Args:
|
||||
ip: IP address
|
||||
port: Port number
|
||||
|
||||
Returns:
|
||||
Dictionary with certificate info and TLS version support
|
||||
"""
|
||||
from sslyze import (
|
||||
Scanner,
|
||||
ServerScanRequest,
|
||||
ServerNetworkLocation,
|
||||
ScanCommand,
|
||||
ScanCommandAttemptStatusEnum,
|
||||
ServerScanStatusEnum
|
||||
)
|
||||
from cryptography import x509
|
||||
from datetime import datetime
|
||||
|
||||
result = {
|
||||
'certificate': {},
|
||||
'tls_versions': {},
|
||||
'errors': []
|
||||
}
|
||||
|
||||
try:
|
||||
# Create server location
|
||||
server_location = ServerNetworkLocation(
|
||||
hostname=ip,
|
||||
port=port
|
||||
)
|
||||
|
||||
# Create scan request with all TLS version scans
|
||||
scan_request = ServerScanRequest(
|
||||
server_location=server_location,
|
||||
scan_commands={
|
||||
ScanCommand.CERTIFICATE_INFO,
|
||||
ScanCommand.SSL_2_0_CIPHER_SUITES,
|
||||
ScanCommand.SSL_3_0_CIPHER_SUITES,
|
||||
ScanCommand.TLS_1_0_CIPHER_SUITES,
|
||||
ScanCommand.TLS_1_1_CIPHER_SUITES,
|
||||
ScanCommand.TLS_1_2_CIPHER_SUITES,
|
||||
ScanCommand.TLS_1_3_CIPHER_SUITES,
|
||||
}
|
||||
)
|
||||
|
||||
# Run scan
|
||||
scanner = Scanner()
|
||||
scanner.queue_scans([scan_request])
|
||||
|
||||
# Process results
|
||||
for scan_result in scanner.get_results():
|
||||
if scan_result.scan_status != ServerScanStatusEnum.COMPLETED:
|
||||
result['errors'].append('Connection failed')
|
||||
return result
|
||||
|
||||
server_scan_result = scan_result.scan_result
|
||||
|
||||
# Extract certificate information
|
||||
cert_attempt = getattr(server_scan_result, 'certificate_info', None)
|
||||
if cert_attempt and cert_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
|
||||
cert_result = cert_attempt.result
|
||||
if cert_result.certificate_deployments:
|
||||
deployment = cert_result.certificate_deployments[0]
|
||||
leaf_cert = deployment.received_certificate_chain[0]
|
||||
|
||||
# Calculate days until expiry
|
||||
not_after = leaf_cert.not_valid_after_utc
|
||||
days_until_expiry = (not_after - datetime.now(not_after.tzinfo)).days
|
||||
|
||||
# Extract SANs
|
||||
sans = []
|
||||
try:
|
||||
san_ext = leaf_cert.extensions.get_extension_for_class(
|
||||
x509.SubjectAlternativeName
|
||||
)
|
||||
sans = [name.value for name in san_ext.value]
|
||||
except x509.ExtensionNotFound:
|
||||
pass
|
||||
|
||||
result['certificate'] = {
|
||||
'subject': leaf_cert.subject.rfc4514_string(),
|
||||
'issuer': leaf_cert.issuer.rfc4514_string(),
|
||||
'serial_number': str(leaf_cert.serial_number),
|
||||
'not_valid_before': leaf_cert.not_valid_before_utc.isoformat(),
|
||||
'not_valid_after': leaf_cert.not_valid_after_utc.isoformat(),
|
||||
'days_until_expiry': days_until_expiry,
|
||||
'sans': sans
|
||||
}
|
||||
|
||||
# Test TLS versions
|
||||
tls_attributes = {
|
||||
'TLS 1.0': 'tls_1_0_cipher_suites',
|
||||
'TLS 1.1': 'tls_1_1_cipher_suites',
|
||||
'TLS 1.2': 'tls_1_2_cipher_suites',
|
||||
'TLS 1.3': 'tls_1_3_cipher_suites'
|
||||
}
|
||||
|
||||
for version_name, attr_name in tls_attributes.items():
|
||||
tls_attempt = getattr(server_scan_result, attr_name, None)
|
||||
if tls_attempt and tls_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
|
||||
tls_result = tls_attempt.result
|
||||
supported = len(tls_result.accepted_cipher_suites) > 0
|
||||
cipher_suites = [
|
||||
suite.cipher_suite.name
|
||||
for suite in tls_result.accepted_cipher_suites
|
||||
]
|
||||
result['tls_versions'][version_name] = {
|
||||
'supported': supported,
|
||||
'cipher_suites': cipher_suites
|
||||
}
|
||||
else:
|
||||
result['tls_versions'][version_name] = {
|
||||
'supported': False,
|
||||
'cipher_suites': []
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(str(e))
|
||||
|
||||
return result
|
||||
|
||||
def _run_http_analysis(self, ip_services: Dict[str, List[Dict]]) -> Dict[str, Dict[int, Dict]]:
|
||||
"""
|
||||
Analyze HTTP/HTTPS services and SSL/TLS configuration
|
||||
|
||||
Args:
|
||||
ip_services: Dict mapping IP addresses to their service lists
|
||||
|
||||
Returns:
|
||||
Dict mapping IPs to port-specific HTTP analysis results
|
||||
"""
|
||||
if not ip_services:
|
||||
return {}
|
||||
|
||||
all_results = {}
|
||||
|
||||
for ip, services in ip_services.items():
|
||||
ip_results = {}
|
||||
|
||||
for service in services:
|
||||
if not self._is_likely_web_service(service):
|
||||
continue
|
||||
|
||||
port = service['port']
|
||||
print(f" Analyzing {ip}:{port}...", flush=True)
|
||||
|
||||
# Detect HTTP vs HTTPS
|
||||
protocol = self._detect_http_https(ip, port, timeout=5)
|
||||
|
||||
if protocol == 'unknown':
|
||||
continue
|
||||
|
||||
result = {'protocol': protocol}
|
||||
|
||||
# If HTTPS, analyze SSL/TLS
|
||||
if protocol == 'https':
|
||||
try:
|
||||
ssl_info = self._analyze_ssl_tls(ip, port)
|
||||
# Only include ssl_tls if we got meaningful data
|
||||
if ssl_info.get('certificate') or ssl_info.get('tls_versions'):
|
||||
result['ssl_tls'] = ssl_info
|
||||
elif ssl_info.get('errors'):
|
||||
# Log errors even if we don't include ssl_tls in output
|
||||
print(f" SSL/TLS analysis failed for {ip}:{port}: {ssl_info['errors']}",
|
||||
file=sys.stderr, flush=True)
|
||||
except Exception as e:
|
||||
print(f" SSL/TLS analysis error for {ip}:{port}: {e}",
|
||||
file=sys.stderr, flush=True)
|
||||
|
||||
ip_results[port] = result
|
||||
|
||||
if ip_results:
|
||||
all_results[ip] = ip_results
|
||||
|
||||
return all_results
|
||||
|
||||
def scan(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform complete scan based on configuration
|
||||
|
||||
Returns:
|
||||
Dictionary containing scan results
|
||||
"""
|
||||
print(f"Starting scan: {self.config['title']}", flush=True)
|
||||
print(f"Config: {self.config_path}", flush=True)
|
||||
|
||||
# Record start time
|
||||
start_time = time.time()
|
||||
|
||||
# Collect all unique IPs
|
||||
all_ips = set()
|
||||
ip_to_site = {}
|
||||
ip_expected = {}
|
||||
|
||||
for site in self.config['sites']:
|
||||
site_name = site['name']
|
||||
for ip_config in site['ips']:
|
||||
ip = ip_config['address']
|
||||
all_ips.add(ip)
|
||||
ip_to_site[ip] = site_name
|
||||
ip_expected[ip] = ip_config.get('expected', {})
|
||||
|
||||
all_ips = sorted(list(all_ips))
|
||||
print(f"Total IPs to scan: {len(all_ips)}", flush=True)
|
||||
|
||||
# Perform ping scan
|
||||
print(f"\n[1/5] Performing ping scan on {len(all_ips)} IPs...", flush=True)
|
||||
ping_results = self._run_ping_scan(all_ips)
|
||||
|
||||
# Perform TCP scan (all ports)
|
||||
print(f"\n[2/5] Performing TCP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True)
|
||||
tcp_results = self._run_masscan(all_ips, '0-65535', 'tcp')
|
||||
|
||||
# Perform UDP scan (all ports)
|
||||
print(f"\n[3/5] Performing UDP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True)
|
||||
udp_results = self._run_masscan(all_ips, '0-65535', 'udp')
|
||||
|
||||
# Organize results by IP
|
||||
results_by_ip = {}
|
||||
for ip in all_ips:
|
||||
results_by_ip[ip] = {
|
||||
'site': ip_to_site[ip],
|
||||
'expected': ip_expected[ip],
|
||||
'actual': {
|
||||
'ping': ping_results.get(ip, False),
|
||||
'tcp_ports': [],
|
||||
'udp_ports': [],
|
||||
'services': []
|
||||
}
|
||||
}
|
||||
|
||||
# Add TCP ports
|
||||
for result in tcp_results:
|
||||
ip = result.get('ip')
|
||||
port = result.get('ports', [{}])[0].get('port')
|
||||
if ip in results_by_ip and port:
|
||||
results_by_ip[ip]['actual']['tcp_ports'].append(port)
|
||||
|
||||
# Add UDP ports
|
||||
for result in udp_results:
|
||||
ip = result.get('ip')
|
||||
port = result.get('ports', [{}])[0].get('port')
|
||||
if ip in results_by_ip and port:
|
||||
results_by_ip[ip]['actual']['udp_ports'].append(port)
|
||||
|
||||
# Sort ports
|
||||
for ip in results_by_ip:
|
||||
results_by_ip[ip]['actual']['tcp_ports'].sort()
|
||||
results_by_ip[ip]['actual']['udp_ports'].sort()
|
||||
|
||||
# Perform service detection on TCP ports
|
||||
print(f"\n[4/5] Performing service detection on discovered TCP ports...", flush=True)
|
||||
ip_ports = {ip: results_by_ip[ip]['actual']['tcp_ports'] for ip in all_ips}
|
||||
service_results = self._run_nmap_service_detection(ip_ports)
|
||||
|
||||
# Add service information to results
|
||||
for ip, services in service_results.items():
|
||||
if ip in results_by_ip:
|
||||
results_by_ip[ip]['actual']['services'] = services
|
||||
|
||||
# Perform HTTP/HTTPS analysis on web services
|
||||
print(f"\n[5/5] Analyzing HTTP/HTTPS services and SSL/TLS configuration...", flush=True)
|
||||
http_results = self._run_http_analysis(service_results)
|
||||
|
||||
# Merge HTTP analysis into service results
|
||||
for ip, port_results in http_results.items():
|
||||
if ip in results_by_ip:
|
||||
for service in results_by_ip[ip]['actual']['services']:
|
||||
port = service['port']
|
||||
if port in port_results:
|
||||
service['http_info'] = port_results[port]
|
||||
|
||||
# Calculate scan duration
|
||||
end_time = time.time()
|
||||
scan_duration = round(end_time - start_time, 2)
|
||||
|
||||
# Build final report
|
||||
report = {
|
||||
'title': self.config['title'],
|
||||
'scan_time': datetime.utcnow().isoformat() + 'Z',
|
||||
'scan_duration': scan_duration,
|
||||
'config_file': str(self.config_path),
|
||||
'sites': []
|
||||
}
|
||||
|
||||
for site in self.config['sites']:
|
||||
site_result = {
|
||||
'name': site['name'],
|
||||
'ips': []
|
||||
}
|
||||
|
||||
for ip_config in site['ips']:
|
||||
ip = ip_config['address']
|
||||
site_result['ips'].append({
|
||||
'address': ip,
|
||||
'expected': ip_expected[ip],
|
||||
'actual': results_by_ip[ip]['actual']
|
||||
})
|
||||
|
||||
report['sites'].append(site_result)
|
||||
|
||||
return report
|
||||
|
||||
def save_report(self, report: Dict[str, Any]) -> Path:
|
||||
"""Save scan report to JSON file"""
|
||||
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
|
||||
output_file = self.output_dir / f"scan_report_{timestamp}.json"
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
json.dump(report, f, indent=2)
|
||||
|
||||
print(f"\nReport saved to: {output_file}", flush=True)
|
||||
return output_file
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='SneakyScanner - Masscan-based network scanner'
|
||||
)
|
||||
parser.add_argument(
|
||||
'config',
|
||||
help='Path to YAML configuration file'
|
||||
)
|
||||
parser.add_argument(
|
||||
'-o', '--output-dir',
|
||||
default='/app/output',
|
||||
help='Output directory for scan results (default: /app/output)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
scanner = SneakyScanner(args.config, args.output_dir)
|
||||
report = scanner.scan()
|
||||
output_file = scanner.save_report(report)
|
||||
|
||||
print("\n" + "="*60, flush=True)
|
||||
print("Scan completed successfully!", flush=True)
|
||||
print(f"Results: {output_file}", flush=True)
|
||||
print("="*60, flush=True)
|
||||
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr, flush=True)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user