This commit addresses multiple issues with schedule management and updates documentation to reflect the transition from YAML-based to database-backed configuration system. **Documentation Updates:** - Update DEPLOYMENT.md to remove all references to YAML config files - Document that all configurations are now stored in SQLite database - Update API examples to use config IDs instead of YAML filenames - Remove configs directory from backup/restore procedures - Update volume management section to reflect database-only storage **Cron Expression Handling:** - Add comprehensive documentation for APScheduler cron format conversion - Document that from_crontab() accepts standard format (Sunday=0) and converts automatically - Add validate_cron_expression() helper method with detailed error messages - Include helpful hints for day-of-week field errors in validation - Fix all deprecated datetime.utcnow() calls, replace with datetime.now(timezone.utc) **Timezone-Aware DateTime Fixes:** - Fix "can't subtract offset-naive and offset-aware datetimes" error - Add timezone awareness to croniter.get_next() return values - Make _get_relative_time() defensive to handle both naive and aware datetimes - Ensure all datetime comparisons use timezone-aware objects **Schedule Edit UI Fixes:** - Fix JavaScript error "Cannot set properties of null (setting 'value')" - Change reference from non-existent 'config-id' to correct 'config-file' element - Add config_name field to schedule API responses for better UX - Eagerly load Schedule.config relationship using joinedload() - Fix AttributeError: use schedule.config.title instead of .name - Display config title and ID in schedule edit form **Technical Details:** - app/web/services/schedule_service.py: 6 datetime.utcnow() fixes, validation enhancements - app/web/services/scheduler_service.py: Documentation, validation, timezone fixes - app/web/templates/schedule_edit.html: JavaScript element reference fix - docs/DEPLOYMENT.md: Complete rewrite of config management sections Fixes scheduling for Sunday at midnight (cron: 0 0 * * 0) Fixes schedule edit page JavaScript errors Improves user experience with config title display
1330 lines
48 KiB
Python
1330 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SneakyScanner - Masscan-based network scanner with YAML configuration
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Callable, Optional
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import yaml
|
|
from libnmap.process import NmapProcess
|
|
from libnmap.parser import NmapParser
|
|
|
|
from src.screenshot_capture import ScreenshotCapture
|
|
from src.report_generator import HTMLReportGenerator
|
|
from web.config import NMAP_HOST_TIMEOUT
|
|
|
|
# Force unbuffered output for Docker
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|
sys.stderr.reconfigure(line_buffering=True)
|
|
|
|
|
|
class ScanCancelledError(Exception):
|
|
"""Raised when a scan is cancelled by the user."""
|
|
pass
|
|
|
|
|
|
class SneakyScanner:
|
|
"""Wrapper for masscan to perform network scans based on YAML config or database config"""
|
|
|
|
def __init__(self, config_path: str = None, config_id: int = None, config_dict: Dict = None, output_dir: str = "/app/output"):
|
|
"""
|
|
Initialize scanner with configuration.
|
|
|
|
Args:
|
|
config_path: Path to YAML config file (legacy)
|
|
config_id: Database config ID (preferred)
|
|
config_dict: Config dictionary (for direct use)
|
|
output_dir: Output directory for scan results
|
|
|
|
Note: Provide exactly one of config_path, config_id, or config_dict
|
|
"""
|
|
if sum([config_path is not None, config_id is not None, config_dict is not None]) != 1:
|
|
raise ValueError("Must provide exactly one of: config_path, config_id, or config_dict")
|
|
|
|
self.config_path = Path(config_path) if config_path else None
|
|
self.config_id = config_id
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if config_dict:
|
|
self.config = config_dict
|
|
# Process sites: resolve references and expand CIDRs
|
|
if 'sites' in self.config:
|
|
self.config['sites'] = self._resolve_sites(self.config['sites'])
|
|
else:
|
|
self.config = self._load_config()
|
|
|
|
self.screenshot_capture = None
|
|
|
|
# Cancellation support
|
|
self._cancelled = False
|
|
self._cancel_lock = threading.Lock()
|
|
self._active_process = None
|
|
self._process_lock = threading.Lock()
|
|
|
|
def cancel(self):
|
|
"""
|
|
Cancel the running scan.
|
|
|
|
Terminates any active subprocess and sets cancellation flag.
|
|
"""
|
|
with self._cancel_lock:
|
|
self._cancelled = True
|
|
|
|
with self._process_lock:
|
|
if self._active_process and self._active_process.poll() is None:
|
|
try:
|
|
# Terminate the process group
|
|
os.killpg(os.getpgid(self._active_process.pid), signal.SIGTERM)
|
|
except (ProcessLookupError, OSError):
|
|
pass
|
|
|
|
def is_cancelled(self) -> bool:
|
|
"""Check if scan has been cancelled."""
|
|
with self._cancel_lock:
|
|
return self._cancelled
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Load and validate configuration from file or database.
|
|
|
|
Supports three formats:
|
|
1. Legacy: Sites with explicit IP lists
|
|
2. Site references: Sites referencing database-stored sites
|
|
3. Inline CIDRs: Sites with CIDR ranges
|
|
"""
|
|
# Load from database if config_id provided
|
|
if self.config_id:
|
|
return self._load_config_from_database(self.config_id)
|
|
|
|
# Load from YAML file
|
|
if not self.config_path.exists():
|
|
raise FileNotFoundError(f"Config file not found: {self.config_path}")
|
|
|
|
with open(self.config_path, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
if not config.get('title'):
|
|
raise ValueError("Config must include 'title' field")
|
|
if not config.get('sites'):
|
|
raise ValueError("Config must include 'sites' field")
|
|
|
|
# Process sites: resolve references and expand CIDRs
|
|
config['sites'] = self._resolve_sites(config['sites'])
|
|
|
|
return config
|
|
|
|
def _load_config_from_database(self, config_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Load configuration from database by ID.
|
|
|
|
Args:
|
|
config_id: Database config ID
|
|
|
|
Returns:
|
|
Config dictionary with expanded sites
|
|
|
|
Raises:
|
|
ValueError: If config not found or invalid
|
|
"""
|
|
try:
|
|
# Import here to avoid circular dependencies and allow scanner to work standalone
|
|
import os
|
|
import sys
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from web.models import ScanConfig
|
|
|
|
# Create database session
|
|
db_url = os.environ.get('DATABASE_URL', 'sqlite:////app/data/sneakyscanner.db')
|
|
engine = create_engine(db_url)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
# Load config from database
|
|
db_config = session.query(ScanConfig).filter_by(id=config_id).first()
|
|
|
|
if not db_config:
|
|
raise ValueError(f"Config with ID {config_id} not found in database")
|
|
|
|
# Build config dict with site references
|
|
config = {
|
|
'title': db_config.title,
|
|
'sites': []
|
|
}
|
|
|
|
# Add each site as a site_ref
|
|
for assoc in db_config.site_associations:
|
|
site = assoc.site
|
|
config['sites'].append({
|
|
'site_ref': site.name
|
|
})
|
|
|
|
# Process sites: resolve references and expand CIDRs
|
|
config['sites'] = self._resolve_sites(config['sites'])
|
|
|
|
return config
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
except ImportError as e:
|
|
raise ValueError(f"Failed to load config from database (import error): {str(e)}")
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to load config from database: {str(e)}")
|
|
|
|
def _resolve_sites(self, sites: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Resolve site references and expand CIDRs to IP lists.
|
|
|
|
Converts all site formats into the legacy format (with explicit IPs)
|
|
for compatibility with the existing scan logic.
|
|
|
|
Args:
|
|
sites: List of site definitions from config
|
|
|
|
Returns:
|
|
List of sites with expanded IP lists
|
|
"""
|
|
import ipaddress
|
|
|
|
resolved_sites = []
|
|
|
|
for site_def in sites:
|
|
# Handle site references
|
|
if 'site_ref' in site_def:
|
|
site_ref = site_def['site_ref']
|
|
# Load site from database
|
|
site_data = self._load_site_from_database(site_ref)
|
|
if site_data:
|
|
resolved_sites.append(site_data)
|
|
else:
|
|
print(f"WARNING: Site reference '{site_ref}' not found in database", file=sys.stderr)
|
|
continue
|
|
|
|
# Handle inline CIDR definitions
|
|
if 'cidrs' in site_def:
|
|
site_name = site_def.get('name', 'Unknown Site')
|
|
expanded_ips = []
|
|
|
|
for cidr_def in site_def['cidrs']:
|
|
cidr = cidr_def['cidr']
|
|
expected_ping = cidr_def.get('expected_ping', False)
|
|
expected_tcp_ports = cidr_def.get('expected_tcp_ports', [])
|
|
expected_udp_ports = cidr_def.get('expected_udp_ports', [])
|
|
|
|
# Check if there are IP-level overrides (from database sites)
|
|
ip_overrides = cidr_def.get('ip_overrides', [])
|
|
override_map = {
|
|
override['ip_address']: override
|
|
for override in ip_overrides
|
|
}
|
|
|
|
# Expand CIDR to IP list
|
|
try:
|
|
network = ipaddress.ip_network(cidr, strict=False)
|
|
ip_list = [str(ip) for ip in network.hosts()]
|
|
|
|
# If network has only 1 address (like /32), hosts() returns empty
|
|
if not ip_list:
|
|
ip_list = [str(network.network_address)]
|
|
|
|
# Create IP config for each IP in the CIDR
|
|
for ip_address in ip_list:
|
|
# Check if this IP has an override
|
|
if ip_address in override_map:
|
|
override = override_map[ip_address]
|
|
ip_config = {
|
|
'address': ip_address,
|
|
'expected': {
|
|
'ping': override.get('expected_ping', expected_ping),
|
|
'tcp_ports': override.get('expected_tcp_ports', expected_tcp_ports),
|
|
'udp_ports': override.get('expected_udp_ports', expected_udp_ports)
|
|
}
|
|
}
|
|
else:
|
|
# Use CIDR-level defaults
|
|
ip_config = {
|
|
'address': ip_address,
|
|
'expected': {
|
|
'ping': expected_ping,
|
|
'tcp_ports': expected_tcp_ports,
|
|
'udp_ports': expected_udp_ports
|
|
}
|
|
}
|
|
|
|
expanded_ips.append(ip_config)
|
|
|
|
except ValueError as e:
|
|
print(f"WARNING: Invalid CIDR '{cidr}': {e}", file=sys.stderr)
|
|
continue
|
|
|
|
# Add expanded site
|
|
resolved_sites.append({
|
|
'name': site_name,
|
|
'ips': expanded_ips
|
|
})
|
|
continue
|
|
|
|
# Legacy format: already has 'ips' list
|
|
if 'ips' in site_def:
|
|
resolved_sites.append(site_def)
|
|
continue
|
|
|
|
print(f"WARNING: Site definition missing required fields: {site_def}", file=sys.stderr)
|
|
|
|
return resolved_sites
|
|
|
|
def _load_site_from_database(self, site_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Load a site definition from the database.
|
|
|
|
IPs are pre-expanded in the database, so we just load them directly.
|
|
|
|
Args:
|
|
site_name: Name of the site to load
|
|
|
|
Returns:
|
|
Site definition dict with IPs, or None if not found
|
|
"""
|
|
try:
|
|
# Import database modules
|
|
import os
|
|
import sys
|
|
|
|
# Add parent directory to path if needed
|
|
parent_dir = str(Path(__file__).parent.parent)
|
|
if parent_dir not in sys.path:
|
|
sys.path.insert(0, parent_dir)
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker, joinedload
|
|
from web.models import Site
|
|
|
|
# Get database URL from environment
|
|
database_url = os.environ.get('DATABASE_URL', 'sqlite:///./sneakyscanner.db')
|
|
|
|
# Create engine and session
|
|
engine = create_engine(database_url)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
# Query site with all IPs (CIDRs are already expanded)
|
|
site = (
|
|
session.query(Site)
|
|
.options(joinedload(Site.ips))
|
|
.filter(Site.name == site_name)
|
|
.first()
|
|
)
|
|
|
|
if not site:
|
|
session.close()
|
|
return None
|
|
|
|
# Load all IPs directly from database (already expanded)
|
|
expanded_ips = []
|
|
|
|
for ip_obj in site.ips:
|
|
# Get settings from IP (no need to merge with CIDR defaults)
|
|
expected_ping = ip_obj.expected_ping if ip_obj.expected_ping is not None else False
|
|
expected_tcp_ports = json.loads(ip_obj.expected_tcp_ports) if ip_obj.expected_tcp_ports else []
|
|
expected_udp_ports = json.loads(ip_obj.expected_udp_ports) if ip_obj.expected_udp_ports else []
|
|
|
|
ip_config = {
|
|
'address': ip_obj.ip_address,
|
|
'expected': {
|
|
'ping': expected_ping,
|
|
'tcp_ports': expected_tcp_ports,
|
|
'udp_ports': expected_udp_ports
|
|
}
|
|
}
|
|
|
|
expanded_ips.append(ip_config)
|
|
|
|
session.close()
|
|
|
|
return {
|
|
'name': site.name,
|
|
'ips': expanded_ips
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to load site '{site_name}' from database: {e}", file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def _run_masscan(self, targets: List[str], ports: str, protocol: str) -> List[Dict]:
|
|
"""
|
|
Run masscan and return parsed results
|
|
|
|
Args:
|
|
targets: List of IP addresses to scan
|
|
ports: Port range string (e.g., "0-65535")
|
|
protocol: "tcp" or "udp"
|
|
"""
|
|
if not targets:
|
|
return []
|
|
|
|
# Create temporary file for targets
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
|
f.write('\n'.join(targets))
|
|
target_file = f.name
|
|
|
|
# Create temporary output file
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
|
|
output_file = f.name
|
|
|
|
try:
|
|
# Build command based on protocol
|
|
if protocol == 'tcp':
|
|
cmd = [
|
|
'masscan',
|
|
'-iL', target_file,
|
|
'-p', ports,
|
|
'--rate', '10000',
|
|
'-oJ', output_file,
|
|
'--wait', '0'
|
|
]
|
|
elif protocol == 'udp':
|
|
cmd = [
|
|
'masscan',
|
|
'-iL', target_file,
|
|
'--udp-ports', ports,
|
|
'--rate', '10000',
|
|
'-oJ', output_file,
|
|
'--wait', '0'
|
|
]
|
|
else:
|
|
raise ValueError(f"Invalid protocol: {protocol}")
|
|
|
|
print(f"Running: {' '.join(cmd)}", flush=True)
|
|
|
|
# Use Popen for cancellation support
|
|
with self._process_lock:
|
|
self._active_process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
start_new_session=True
|
|
)
|
|
|
|
stdout, stderr = self._active_process.communicate()
|
|
returncode = self._active_process.returncode
|
|
|
|
with self._process_lock:
|
|
self._active_process = None
|
|
|
|
# Check if cancelled
|
|
if self.is_cancelled():
|
|
return []
|
|
|
|
print(f"Masscan {protocol.upper()} scan completed", flush=True)
|
|
|
|
if returncode != 0:
|
|
print(f"Masscan stderr: {stderr}", file=sys.stderr)
|
|
|
|
# Parse masscan JSON output
|
|
results = []
|
|
with open(output_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
try:
|
|
results.append(json.loads(line.rstrip(',')))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return results
|
|
|
|
finally:
|
|
# Cleanup temp files
|
|
Path(target_file).unlink(missing_ok=True)
|
|
Path(output_file).unlink(missing_ok=True)
|
|
|
|
def _run_ping_scan(self, targets: List[str]) -> Dict[str, bool]:
|
|
"""
|
|
Run ping scan using masscan ICMP echo
|
|
|
|
Returns:
|
|
Dict mapping IP addresses to ping response status
|
|
"""
|
|
if not targets:
|
|
return {}
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
|
f.write('\n'.join(targets))
|
|
target_file = f.name
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
|
|
output_file = f.name
|
|
|
|
try:
|
|
cmd = [
|
|
'masscan',
|
|
'-iL', target_file,
|
|
'--ping',
|
|
'--rate', '10000',
|
|
'-oJ', output_file,
|
|
'--wait', '0'
|
|
]
|
|
|
|
print(f"Running: {' '.join(cmd)}", flush=True)
|
|
|
|
# Use Popen for cancellation support
|
|
with self._process_lock:
|
|
self._active_process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
start_new_session=True
|
|
)
|
|
|
|
stdout, stderr = self._active_process.communicate()
|
|
returncode = self._active_process.returncode
|
|
|
|
with self._process_lock:
|
|
self._active_process = None
|
|
|
|
# Check if cancelled
|
|
if self.is_cancelled():
|
|
return {}
|
|
|
|
print(f"Masscan PING scan completed", flush=True)
|
|
|
|
if returncode != 0:
|
|
print(f"Masscan stderr: {stderr}", file=sys.stderr, flush=True)
|
|
|
|
# Parse results
|
|
responding_ips = set()
|
|
with open(output_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
try:
|
|
data = json.loads(line.rstrip(','))
|
|
if 'ip' in data:
|
|
responding_ips.add(data['ip'])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Create result dict for all targets
|
|
return {ip: (ip in responding_ips) for ip in targets}
|
|
|
|
finally:
|
|
Path(target_file).unlink(missing_ok=True)
|
|
Path(output_file).unlink(missing_ok=True)
|
|
|
|
def _run_nmap_service_detection(self, ip_ports: Dict[str, List[int]]) -> Dict[str, List[Dict]]:
|
|
"""
|
|
Run nmap service detection on discovered ports
|
|
|
|
Args:
|
|
ip_ports: Dict mapping IP addresses to list of TCP ports
|
|
|
|
Returns:
|
|
Dict mapping IP addresses to list of service info dicts
|
|
"""
|
|
if not ip_ports:
|
|
return {}
|
|
|
|
all_services = {}
|
|
|
|
for ip, ports in ip_ports.items():
|
|
# Check if cancelled before each host
|
|
if self.is_cancelled():
|
|
break
|
|
|
|
if not ports:
|
|
all_services[ip] = []
|
|
continue
|
|
|
|
# Build port list string
|
|
port_list = ','.join(map(str, sorted(ports)))
|
|
|
|
print(f" Scanning {ip} ports {port_list}...", flush=True)
|
|
|
|
# Create temporary output file for XML
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.xml') as f:
|
|
xml_output = f.name
|
|
|
|
try:
|
|
# Run nmap with service detection
|
|
cmd = [
|
|
'nmap',
|
|
'-sV', # Service version detection
|
|
'--version-intensity', '5', # Balanced speed/accuracy
|
|
'-p', port_list,
|
|
'-oX', xml_output, # XML output
|
|
'--host-timeout', NMAP_HOST_TIMEOUT, # Timeout per host
|
|
ip
|
|
]
|
|
|
|
# Use Popen for cancellation support
|
|
with self._process_lock:
|
|
self._active_process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
start_new_session=True
|
|
)
|
|
|
|
stdout, stderr = self._active_process.communicate(timeout=600)
|
|
returncode = self._active_process.returncode
|
|
|
|
with self._process_lock:
|
|
self._active_process = None
|
|
|
|
# Check if cancelled
|
|
if self.is_cancelled():
|
|
Path(xml_output).unlink(missing_ok=True)
|
|
break
|
|
|
|
if returncode != 0:
|
|
print(f" Nmap warning for {ip}: {stderr}", file=sys.stderr, flush=True)
|
|
|
|
# Parse XML output
|
|
services = self._parse_nmap_xml(xml_output)
|
|
all_services[ip] = services
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print(f" Nmap timeout for {ip}, skipping service detection", file=sys.stderr, flush=True)
|
|
all_services[ip] = []
|
|
except Exception as e:
|
|
print(f" Nmap error for {ip}: {e}", file=sys.stderr, flush=True)
|
|
all_services[ip] = []
|
|
finally:
|
|
Path(xml_output).unlink(missing_ok=True)
|
|
|
|
return all_services
|
|
|
|
def _parse_nmap_xml(self, xml_file: str) -> List[Dict]:
|
|
"""
|
|
Parse nmap XML output to extract service information
|
|
|
|
Args:
|
|
xml_file: Path to nmap XML output file
|
|
|
|
Returns:
|
|
List of service info dictionaries
|
|
"""
|
|
services = []
|
|
|
|
try:
|
|
tree = ET.parse(xml_file)
|
|
root = tree.getroot()
|
|
|
|
# Find all ports
|
|
for port_elem in root.findall('.//port'):
|
|
port_id = port_elem.get('portid')
|
|
protocol = port_elem.get('protocol', 'tcp')
|
|
|
|
# Get state
|
|
state_elem = port_elem.find('state')
|
|
if state_elem is None or state_elem.get('state') != 'open':
|
|
continue
|
|
|
|
# Get service info
|
|
service_elem = port_elem.find('service')
|
|
if service_elem is not None:
|
|
service_info = {
|
|
'port': int(port_id),
|
|
'protocol': protocol,
|
|
'service': service_elem.get('name', 'unknown'),
|
|
'product': service_elem.get('product', ''),
|
|
'version': service_elem.get('version', ''),
|
|
'extrainfo': service_elem.get('extrainfo', ''),
|
|
'ostype': service_elem.get('ostype', '')
|
|
}
|
|
|
|
# Clean up empty fields
|
|
service_info = {k: v for k, v in service_info.items() if v}
|
|
|
|
services.append(service_info)
|
|
else:
|
|
# Port is open but no service info
|
|
services.append({
|
|
'port': int(port_id),
|
|
'protocol': protocol,
|
|
'service': 'unknown'
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f" Error parsing nmap XML: {e}", file=sys.stderr, flush=True)
|
|
|
|
return services
|
|
|
|
def _is_likely_web_service(self, service: Dict, ip: str = None) -> bool:
|
|
"""
|
|
Check if a service is a web server by actually making an HTTP request
|
|
|
|
Args:
|
|
service: Service dictionary from nmap results
|
|
ip: IP address to test (required for HTTP probe)
|
|
|
|
Returns:
|
|
True if service responds to HTTP/HTTPS requests
|
|
"""
|
|
import requests
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# Quick check for known web service names first
|
|
web_services = ['http', 'https', 'ssl', 'http-proxy', 'https-alt',
|
|
'http-alt', 'ssl/http', 'ssl/https']
|
|
service_name = service.get('service', '').lower()
|
|
|
|
# If no IP provided, can't do HTTP probe
|
|
port = service.get('port')
|
|
if not ip or not port:
|
|
# check just the service if no IP - honestly shouldn't get here, but just incase...
|
|
if service_name in web_services:
|
|
return True
|
|
return False
|
|
|
|
# Actually try to connect - this is the definitive test
|
|
# Try HTTPS first, then HTTP
|
|
for protocol in ['https', 'http']:
|
|
url = f"{protocol}://{ip}:{port}/"
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
timeout=3,
|
|
verify=False,
|
|
allow_redirects=False
|
|
)
|
|
# Any status code means it's a web server
|
|
# (including 404, 500, etc. - still a web server)
|
|
return True
|
|
except requests.exceptions.SSLError:
|
|
# SSL error on HTTPS, try HTTP next
|
|
continue
|
|
except (requests.exceptions.ConnectionError,
|
|
requests.exceptions.Timeout,
|
|
requests.exceptions.RequestException):
|
|
continue
|
|
|
|
return False
|
|
|
|
def _detect_http_https(self, ip: str, port: int, timeout: int = 5) -> str:
|
|
"""
|
|
Detect if a port is HTTP or HTTPS
|
|
|
|
Args:
|
|
ip: IP address
|
|
port: Port number
|
|
timeout: Connection timeout in seconds
|
|
|
|
Returns:
|
|
'http', 'https', or 'unknown'
|
|
"""
|
|
import socket
|
|
import ssl as ssl_module
|
|
|
|
# Try HTTPS first
|
|
try:
|
|
context = ssl_module.create_default_context()
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl_module.CERT_NONE
|
|
|
|
with socket.create_connection((ip, port), timeout=timeout) as sock:
|
|
with context.wrap_socket(sock, server_hostname=ip) as ssock:
|
|
return 'https'
|
|
except ssl_module.SSLError:
|
|
# Not HTTPS, try HTTP
|
|
pass
|
|
except (socket.timeout, socket.error, ConnectionRefusedError):
|
|
return 'unknown'
|
|
|
|
# Try HTTP
|
|
try:
|
|
with socket.create_connection((ip, port), timeout=timeout) as sock:
|
|
sock.send(b'HEAD / HTTP/1.0\r\n\r\n')
|
|
response = sock.recv(1024)
|
|
if b'HTTP' in response:
|
|
return 'http'
|
|
except (socket.timeout, socket.error, ConnectionRefusedError):
|
|
pass
|
|
|
|
return 'unknown'
|
|
|
|
def _analyze_ssl_tls(self, ip: str, port: int) -> Dict[str, Any]:
|
|
"""
|
|
Analyze SSL/TLS configuration including certificate and supported versions
|
|
|
|
Args:
|
|
ip: IP address
|
|
port: Port number
|
|
|
|
Returns:
|
|
Dictionary with certificate info and TLS version support
|
|
"""
|
|
from sslyze import (
|
|
Scanner,
|
|
ServerScanRequest,
|
|
ServerNetworkLocation,
|
|
ScanCommand,
|
|
ScanCommandAttemptStatusEnum,
|
|
ServerScanStatusEnum
|
|
)
|
|
from cryptography import x509
|
|
from datetime import datetime
|
|
|
|
result = {
|
|
'certificate': {},
|
|
'tls_versions': {},
|
|
'errors': []
|
|
}
|
|
|
|
try:
|
|
# Create server location
|
|
server_location = ServerNetworkLocation(
|
|
hostname=ip,
|
|
port=port
|
|
)
|
|
|
|
# Create scan request with all TLS version scans
|
|
scan_request = ServerScanRequest(
|
|
server_location=server_location,
|
|
scan_commands={
|
|
ScanCommand.CERTIFICATE_INFO,
|
|
ScanCommand.SSL_2_0_CIPHER_SUITES,
|
|
ScanCommand.SSL_3_0_CIPHER_SUITES,
|
|
ScanCommand.TLS_1_0_CIPHER_SUITES,
|
|
ScanCommand.TLS_1_1_CIPHER_SUITES,
|
|
ScanCommand.TLS_1_2_CIPHER_SUITES,
|
|
ScanCommand.TLS_1_3_CIPHER_SUITES,
|
|
}
|
|
)
|
|
|
|
# Run scan
|
|
scanner = Scanner()
|
|
scanner.queue_scans([scan_request])
|
|
|
|
# Process results
|
|
for scan_result in scanner.get_results():
|
|
if scan_result.scan_status != ServerScanStatusEnum.COMPLETED:
|
|
result['errors'].append('Connection failed')
|
|
return result
|
|
|
|
server_scan_result = scan_result.scan_result
|
|
|
|
# Extract certificate information
|
|
cert_attempt = getattr(server_scan_result, 'certificate_info', None)
|
|
if cert_attempt and cert_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
|
|
cert_result = cert_attempt.result
|
|
if cert_result.certificate_deployments:
|
|
deployment = cert_result.certificate_deployments[0]
|
|
leaf_cert = deployment.received_certificate_chain[0]
|
|
|
|
# Calculate days until expiry
|
|
not_after = leaf_cert.not_valid_after_utc
|
|
days_until_expiry = (not_after - datetime.now(not_after.tzinfo)).days
|
|
|
|
# Extract SANs
|
|
sans = []
|
|
try:
|
|
san_ext = leaf_cert.extensions.get_extension_for_class(
|
|
x509.SubjectAlternativeName
|
|
)
|
|
sans = [name.value for name in san_ext.value]
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
|
|
result['certificate'] = {
|
|
'subject': leaf_cert.subject.rfc4514_string(),
|
|
'issuer': leaf_cert.issuer.rfc4514_string(),
|
|
'serial_number': str(leaf_cert.serial_number),
|
|
'not_valid_before': leaf_cert.not_valid_before_utc.isoformat(),
|
|
'not_valid_after': leaf_cert.not_valid_after_utc.isoformat(),
|
|
'days_until_expiry': days_until_expiry,
|
|
'sans': sans
|
|
}
|
|
|
|
# Test TLS versions
|
|
tls_attributes = {
|
|
'TLS 1.0': 'tls_1_0_cipher_suites',
|
|
'TLS 1.1': 'tls_1_1_cipher_suites',
|
|
'TLS 1.2': 'tls_1_2_cipher_suites',
|
|
'TLS 1.3': 'tls_1_3_cipher_suites'
|
|
}
|
|
|
|
for version_name, attr_name in tls_attributes.items():
|
|
tls_attempt = getattr(server_scan_result, attr_name, None)
|
|
if tls_attempt and tls_attempt.status == ScanCommandAttemptStatusEnum.COMPLETED:
|
|
tls_result = tls_attempt.result
|
|
supported = len(tls_result.accepted_cipher_suites) > 0
|
|
cipher_suites = [
|
|
suite.cipher_suite.name
|
|
for suite in tls_result.accepted_cipher_suites
|
|
]
|
|
result['tls_versions'][version_name] = {
|
|
'supported': supported,
|
|
'cipher_suites': cipher_suites
|
|
}
|
|
else:
|
|
result['tls_versions'][version_name] = {
|
|
'supported': False,
|
|
'cipher_suites': []
|
|
}
|
|
|
|
except Exception as e:
|
|
result['errors'].append(str(e))
|
|
|
|
return result
|
|
|
|
def _run_http_analysis(self, ip_services: Dict[str, List[Dict]]) -> Dict[str, Dict[int, Dict]]:
|
|
"""
|
|
Analyze HTTP/HTTPS services and SSL/TLS configuration
|
|
|
|
Args:
|
|
ip_services: Dict mapping IP addresses to their service lists
|
|
|
|
Returns:
|
|
Dict mapping IPs to port-specific HTTP analysis results
|
|
"""
|
|
if not ip_services:
|
|
return {}
|
|
|
|
all_results = {}
|
|
|
|
for ip, services in ip_services.items():
|
|
ip_results = {}
|
|
|
|
for service in services:
|
|
if not self._is_likely_web_service(service, ip):
|
|
continue
|
|
|
|
port = service['port']
|
|
print(f" Analyzing {ip}:{port}...", flush=True)
|
|
|
|
# Detect HTTP vs HTTPS
|
|
protocol = self._detect_http_https(ip, port, timeout=5)
|
|
|
|
if protocol == 'unknown':
|
|
continue
|
|
|
|
result = {'protocol': protocol}
|
|
|
|
# Capture screenshot if screenshot capture is enabled
|
|
if self.screenshot_capture:
|
|
try:
|
|
screenshot_path = self.screenshot_capture.capture(ip, port, protocol)
|
|
if screenshot_path:
|
|
result['screenshot'] = screenshot_path
|
|
except Exception as e:
|
|
print(f" Screenshot capture error for {ip}:{port}: {e}",
|
|
file=sys.stderr, flush=True)
|
|
|
|
# If HTTPS, analyze SSL/TLS
|
|
if protocol == 'https':
|
|
try:
|
|
ssl_info = self._analyze_ssl_tls(ip, port)
|
|
# Only include ssl_tls if we got meaningful data
|
|
if ssl_info.get('certificate') or ssl_info.get('tls_versions'):
|
|
result['ssl_tls'] = ssl_info
|
|
elif ssl_info.get('errors'):
|
|
# Log errors even if we don't include ssl_tls in output
|
|
print(f" SSL/TLS analysis failed for {ip}:{port}: {ssl_info['errors']}",
|
|
file=sys.stderr, flush=True)
|
|
except Exception as e:
|
|
print(f" SSL/TLS analysis error for {ip}:{port}: {e}",
|
|
file=sys.stderr, flush=True)
|
|
|
|
ip_results[port] = result
|
|
|
|
if ip_results:
|
|
all_results[ip] = ip_results
|
|
|
|
return all_results
|
|
|
|
def scan(self, progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
|
|
"""
|
|
Perform complete scan based on configuration
|
|
|
|
Args:
|
|
progress_callback: Optional callback function for progress updates.
|
|
Called with (phase, ip, data) where:
|
|
- phase: 'init', 'ping', 'tcp_scan', 'udp_scan', 'service_detection', 'http_analysis'
|
|
- ip: IP address being processed (or None for phase start)
|
|
- data: Dict with progress data (results, counts, etc.)
|
|
|
|
Returns:
|
|
Dictionary containing scan results
|
|
"""
|
|
print(f"Starting scan: {self.config['title']}", flush=True)
|
|
if self.config_id:
|
|
print(f"Config ID: {self.config_id}", flush=True)
|
|
elif self.config_path:
|
|
print(f"Config: {self.config_path}", flush=True)
|
|
|
|
# Record start time
|
|
start_time = time.time()
|
|
scan_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Initialize screenshot capture
|
|
self.screenshot_capture = ScreenshotCapture(
|
|
output_dir=str(self.output_dir),
|
|
scan_timestamp=scan_timestamp,
|
|
timeout=15
|
|
)
|
|
|
|
# Collect all unique IPs
|
|
all_ips = set()
|
|
ip_to_site = {}
|
|
ip_expected = {}
|
|
|
|
for site in self.config['sites']:
|
|
site_name = site['name']
|
|
for ip_config in site['ips']:
|
|
ip = ip_config['address']
|
|
all_ips.add(ip)
|
|
ip_to_site[ip] = site_name
|
|
ip_expected[ip] = ip_config.get('expected', {})
|
|
|
|
all_ips = sorted(list(all_ips))
|
|
print(f"Total IPs to scan: {len(all_ips)}", flush=True)
|
|
|
|
# Report initialization with total IP count
|
|
if progress_callback:
|
|
progress_callback('init', None, {
|
|
'total_ips': len(all_ips),
|
|
'ip_to_site': ip_to_site
|
|
})
|
|
|
|
# Perform ping scan
|
|
print(f"\n[1/5] Performing ping scan on {len(all_ips)} IPs...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('ping', None, {'status': 'starting'})
|
|
ping_results = self._run_ping_scan(all_ips)
|
|
|
|
# Check for cancellation
|
|
if self.is_cancelled():
|
|
print("\nScan cancelled by user", flush=True)
|
|
raise ScanCancelledError("Scan cancelled by user")
|
|
|
|
# Report ping results
|
|
if progress_callback:
|
|
progress_callback('ping', None, {
|
|
'status': 'completed',
|
|
'results': ping_results
|
|
})
|
|
|
|
# Perform TCP scan (all ports)
|
|
print(f"\n[2/5] Performing TCP scan on {len(all_ips)} IPs (ports 0-65535)...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('tcp_scan', None, {'status': 'starting'})
|
|
tcp_results = self._run_masscan(all_ips, '0-65535', 'tcp')
|
|
|
|
# Check for cancellation
|
|
if self.is_cancelled():
|
|
print("\nScan cancelled by user", flush=True)
|
|
raise ScanCancelledError("Scan cancelled by user")
|
|
|
|
# Perform UDP scan (if enabled)
|
|
udp_enabled = os.environ.get('UDP_SCAN_ENABLED', 'false').lower() == 'true'
|
|
udp_ports = os.environ.get('UDP_PORTS', '53,67,68,69,123,161,500,514,1900')
|
|
|
|
if udp_enabled:
|
|
print(f"\n[3/5] Performing UDP scan on {len(all_ips)} IPs (ports {udp_ports})...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('udp_scan', None, {'status': 'starting'})
|
|
udp_results = self._run_masscan(all_ips, udp_ports, 'udp')
|
|
|
|
# Check for cancellation
|
|
if self.is_cancelled():
|
|
print("\nScan cancelled by user", flush=True)
|
|
raise ScanCancelledError("Scan cancelled by user")
|
|
else:
|
|
print(f"\n[3/5] Skipping UDP scan (disabled)...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('udp_scan', None, {'status': 'skipped'})
|
|
udp_results = []
|
|
|
|
# Organize results by IP
|
|
results_by_ip = {}
|
|
for ip in all_ips:
|
|
results_by_ip[ip] = {
|
|
'site': ip_to_site[ip],
|
|
'expected': ip_expected[ip],
|
|
'actual': {
|
|
'ping': ping_results.get(ip, False),
|
|
'tcp_ports': [],
|
|
'udp_ports': [],
|
|
'services': []
|
|
}
|
|
}
|
|
|
|
# Add TCP ports
|
|
for result in tcp_results:
|
|
ip = result.get('ip')
|
|
port = result.get('ports', [{}])[0].get('port')
|
|
if ip in results_by_ip and port:
|
|
results_by_ip[ip]['actual']['tcp_ports'].append(port)
|
|
|
|
# Add UDP ports
|
|
for result in udp_results:
|
|
ip = result.get('ip')
|
|
port = result.get('ports', [{}])[0].get('port')
|
|
if ip in results_by_ip and port:
|
|
results_by_ip[ip]['actual']['udp_ports'].append(port)
|
|
|
|
# Sort ports
|
|
for ip in results_by_ip:
|
|
results_by_ip[ip]['actual']['tcp_ports'].sort()
|
|
results_by_ip[ip]['actual']['udp_ports'].sort()
|
|
|
|
# Report TCP/UDP scan results with discovered ports per IP
|
|
if progress_callback:
|
|
tcp_udp_results = {}
|
|
for ip in all_ips:
|
|
tcp_udp_results[ip] = {
|
|
'tcp_ports': results_by_ip[ip]['actual']['tcp_ports'],
|
|
'udp_ports': results_by_ip[ip]['actual']['udp_ports']
|
|
}
|
|
progress_callback('tcp_scan', None, {
|
|
'status': 'completed',
|
|
'results': tcp_udp_results
|
|
})
|
|
|
|
# Perform service detection on TCP ports
|
|
print(f"\n[4/5] Performing service detection on discovered TCP ports...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('service_detection', None, {'status': 'starting'})
|
|
ip_ports = {ip: results_by_ip[ip]['actual']['tcp_ports'] for ip in all_ips}
|
|
service_results = self._run_nmap_service_detection(ip_ports)
|
|
|
|
# Check for cancellation
|
|
if self.is_cancelled():
|
|
print("\nScan cancelled by user", flush=True)
|
|
raise ScanCancelledError("Scan cancelled by user")
|
|
|
|
# Add service information to results
|
|
for ip, services in service_results.items():
|
|
if ip in results_by_ip:
|
|
results_by_ip[ip]['actual']['services'] = services
|
|
|
|
# Report service detection results
|
|
if progress_callback:
|
|
progress_callback('service_detection', None, {
|
|
'status': 'completed',
|
|
'results': service_results
|
|
})
|
|
|
|
# Perform HTTP/HTTPS analysis on web services
|
|
print(f"\n[5/5] Analyzing HTTP/HTTPS services and SSL/TLS configuration...", flush=True)
|
|
if progress_callback:
|
|
progress_callback('http_analysis', None, {'status': 'starting'})
|
|
http_results = self._run_http_analysis(service_results)
|
|
|
|
# Report HTTP analysis completion
|
|
if progress_callback:
|
|
progress_callback('http_analysis', None, {
|
|
'status': 'completed',
|
|
'results': http_results
|
|
})
|
|
|
|
# Merge HTTP analysis into service results
|
|
for ip, port_results in http_results.items():
|
|
if ip in results_by_ip:
|
|
for service in results_by_ip[ip]['actual']['services']:
|
|
port = service['port']
|
|
if port in port_results:
|
|
service['http_info'] = port_results[port]
|
|
|
|
# Calculate scan duration
|
|
end_time = time.time()
|
|
scan_duration = round(end_time - start_time, 2)
|
|
|
|
# Build final report
|
|
report = {
|
|
'title': self.config['title'],
|
|
'scan_time': datetime.utcnow().isoformat() + 'Z',
|
|
'scan_duration': scan_duration,
|
|
'config_id': self.config_id,
|
|
'sites': []
|
|
}
|
|
|
|
for site in self.config['sites']:
|
|
site_result = {
|
|
'name': site['name'],
|
|
'ips': []
|
|
}
|
|
|
|
for ip_config in site['ips']:
|
|
ip = ip_config['address']
|
|
site_result['ips'].append({
|
|
'address': ip,
|
|
'expected': ip_expected[ip],
|
|
'actual': results_by_ip[ip]['actual']
|
|
})
|
|
|
|
report['sites'].append(site_result)
|
|
|
|
# Clean up screenshot capture browser
|
|
if self.screenshot_capture:
|
|
self.screenshot_capture._close_browser()
|
|
|
|
return report, scan_timestamp
|
|
|
|
def save_report(self, report: Dict[str, Any], scan_timestamp: str) -> Path:
|
|
"""Save scan report to JSON file using provided timestamp"""
|
|
output_file = self.output_dir / f"scan_report_{scan_timestamp}.json"
|
|
|
|
with open(output_file, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
print(f"\nReport saved to: {output_file}", flush=True)
|
|
return output_file
|
|
|
|
def generate_outputs(self, report: Dict[str, Any], scan_timestamp: str) -> Dict[str, Path]:
|
|
"""
|
|
Generate all output formats: JSON, HTML report, and ZIP archive
|
|
|
|
Args:
|
|
report: Scan report dictionary
|
|
scan_timestamp: Timestamp string in format YYYYMMDD_HHMMSS
|
|
|
|
Returns:
|
|
Dictionary with paths to generated files: {'json': Path, 'html': Path, 'zip': Path}
|
|
"""
|
|
output_paths = {}
|
|
|
|
# Step 1: Save JSON report
|
|
print("\n" + "="*60, flush=True)
|
|
print("Generating outputs...", flush=True)
|
|
print("="*60, flush=True)
|
|
|
|
json_path = self.save_report(report, scan_timestamp)
|
|
output_paths['json'] = json_path
|
|
|
|
# Step 2: Generate HTML report
|
|
html_path = self.output_dir / f"scan_report_{scan_timestamp}.html"
|
|
|
|
try:
|
|
print(f"\nGenerating HTML report...", flush=True)
|
|
|
|
# Auto-detect template directory relative to this script
|
|
template_dir = Path(__file__).parent.parent / 'templates'
|
|
|
|
# Create HTML report generator
|
|
generator = HTMLReportGenerator(
|
|
json_report_path=str(json_path),
|
|
template_dir=str(template_dir)
|
|
)
|
|
|
|
# Generate report
|
|
html_result = generator.generate_report(output_path=str(html_path))
|
|
output_paths['html'] = Path(html_result)
|
|
|
|
print(f"HTML report saved to: {html_path}", flush=True)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: HTML report generation failed: {e}", file=sys.stderr, flush=True)
|
|
print(f"Continuing with JSON output only...", file=sys.stderr, flush=True)
|
|
# Don't add html_path to output_paths if it failed
|
|
|
|
# Step 3: Create ZIP archive
|
|
zip_path = self.output_dir / f"scan_report_{scan_timestamp}.zip"
|
|
|
|
try:
|
|
print(f"\nCreating ZIP archive...", flush=True)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
# Add JSON report
|
|
zipf.write(json_path, json_path.name)
|
|
|
|
# Add HTML report if it was generated
|
|
if 'html' in output_paths and html_path.exists():
|
|
zipf.write(html_path, html_path.name)
|
|
|
|
# Add screenshots directory if it exists
|
|
screenshot_dir = self.output_dir / f"scan_report_{scan_timestamp}_screenshots"
|
|
if screenshot_dir.exists() and screenshot_dir.is_dir():
|
|
# Add all files in screenshot directory
|
|
for screenshot_file in screenshot_dir.iterdir():
|
|
if screenshot_file.is_file():
|
|
# Preserve directory structure in ZIP
|
|
arcname = f"{screenshot_dir.name}/{screenshot_file.name}"
|
|
zipf.write(screenshot_file, arcname)
|
|
# Track screenshot directory for database storage
|
|
output_paths['screenshots'] = screenshot_dir
|
|
|
|
output_paths['zip'] = zip_path
|
|
print(f"ZIP archive saved to: {zip_path}", flush=True)
|
|
|
|
except Exception as e:
|
|
print(f"Warning: ZIP archive creation failed: {e}", file=sys.stderr, flush=True)
|
|
# Don't add zip_path to output_paths if it failed
|
|
|
|
return output_paths
|
|
|
|
|
|
def main():
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[logging.StreamHandler(sys.stderr)]
|
|
)
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='SneakyScanner - Masscan-based network scanner'
|
|
)
|
|
parser.add_argument(
|
|
'config',
|
|
help='Path to YAML configuration file'
|
|
)
|
|
parser.add_argument(
|
|
'-o', '--output-dir',
|
|
default='/app/output',
|
|
help='Output directory for scan results (default: /app/output)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
scanner = SneakyScanner(args.config, args.output_dir)
|
|
report, scan_timestamp = scanner.scan()
|
|
output_paths = scanner.generate_outputs(report, scan_timestamp)
|
|
|
|
print("\n" + "="*60, flush=True)
|
|
print("Scan completed successfully!", flush=True)
|
|
print("="*60, flush=True)
|
|
print(f" JSON Report: {output_paths.get('json', 'N/A')}", flush=True)
|
|
print(f" HTML Report: {output_paths.get('html', 'N/A')}", flush=True)
|
|
print(f" ZIP Archive: {output_paths.get('zip', 'N/A')}", flush=True)
|
|
print("="*60, flush=True)
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr, flush=True)
|
|
return 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|