Files
SneakyScan/app/web/api/scans.py
Phillip Tarrant 847e05abbe Changes Made
1. app/web/utils/validators.py - Added 'finalizing' to valid_statuses list
  2. app/web/models.py - Updated status field comment to document all valid statuses
  3. app/web/jobs/scan_job.py
  - Added transition to 'finalizing' status before output file generation
  - Sets current_phase = 'generating_outputs' during this phase
  - Wrapped output generation in try-except with proper error handling
  - If output generation fails, scan is marked 'completed' with warning message (scan data is still valid)

  4. app/web/api/scans.py
  - Added _recover_orphaned_scan() helper function for smart recovery
  - Modified stop_running_scan() to:
    - Allow stopping scans with status 'running' OR 'finalizing'
    - When scanner not in registry, perform smart recovery instead of returning 404
    - Smart recovery checks for output files and marks as 'completed' if found, 'cancelled' if not

  5. app/web/services/scan_service.py
  - Enhanced cleanup_orphaned_scans() with smart recovery logic
  - Now finds scans in both 'running' and 'finalizing' status
  - Returns dict with stats: {'recovered': N, 'failed': N, 'total': N}

  6. app/web/app.py - Updated caller to handle new dict return type from cleanup_orphaned_scans()

  Expected Behavior Now

  1. Normal scan flow: running → finalizing → completed
  2. Stop on active scan: Sends cancel signal, becomes 'cancelled'
  3. Stop on orphaned scan with files: Smart recovery → 'completed'
  4. Stop on orphaned scan without files: → 'cancelled'
  5. App restart with orphans: Startup cleanup uses smart recovery
2025-11-25 14:47:36 -06:00

643 lines
21 KiB
Python

"""
Scans API blueprint.
Handles endpoints for triggering scans, listing scan history, and retrieving
scan results.
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from flask import Blueprint, current_app, jsonify, request
from sqlalchemy.exc import SQLAlchemyError
from web.auth.decorators import api_auth_required
from web.models import Scan, ScanProgress
from web.services.scan_service import ScanService
from web.utils.pagination import validate_page_params
from web.jobs.scan_job import stop_scan
bp = Blueprint('scans', __name__)
logger = logging.getLogger(__name__)
def _recover_orphaned_scan(scan: Scan, session) -> dict:
"""
Recover an orphaned scan by checking for output files.
If output files exist: mark as 'completed' (smart recovery)
If no output files: mark as 'cancelled'
Args:
scan: The orphaned Scan object
session: Database session
Returns:
Dictionary with recovery result for API response
"""
# Check for existing output files
output_exists = False
output_files_found = []
# Check paths stored in database
if scan.json_path and Path(scan.json_path).exists():
output_exists = True
output_files_found.append('json')
if scan.html_path and Path(scan.html_path).exists():
output_files_found.append('html')
if scan.zip_path and Path(scan.zip_path).exists():
output_files_found.append('zip')
# Also check by timestamp pattern if paths not stored yet
if not output_exists and scan.started_at:
output_dir = Path('/app/output')
if output_dir.exists():
timestamp_pattern = scan.started_at.strftime('%Y%m%d')
for json_file in output_dir.glob(f'scan_report_{timestamp_pattern}*.json'):
output_exists = True
output_files_found.append('json')
# Update scan record with found paths
scan.json_path = str(json_file)
html_file = json_file.with_suffix('.html')
if html_file.exists():
scan.html_path = str(html_file)
output_files_found.append('html')
zip_file = json_file.with_suffix('.zip')
if zip_file.exists():
scan.zip_path = str(zip_file)
output_files_found.append('zip')
break
if output_exists:
# Smart recovery: outputs exist, mark as completed
scan.status = 'completed'
scan.completed_at = datetime.utcnow()
if scan.started_at:
scan.duration = (datetime.utcnow() - scan.started_at).total_seconds()
scan.error_message = None
session.commit()
logger.info(f"Scan {scan.id}: Recovered as completed (files: {output_files_found})")
return {
'scan_id': scan.id,
'status': 'completed',
'message': f'Scan recovered as completed (output files found: {", ".join(output_files_found)})',
'recovery_type': 'smart_recovery'
}
else:
# No outputs: mark as cancelled
scan.status = 'cancelled'
scan.completed_at = datetime.utcnow()
if scan.started_at:
scan.duration = (datetime.utcnow() - scan.started_at).total_seconds()
scan.error_message = 'Scan process was interrupted before completion. No output files were generated.'
session.commit()
logger.info(f"Scan {scan.id}: Marked as cancelled (orphaned, no output files)")
return {
'scan_id': scan.id,
'status': 'cancelled',
'message': 'Orphaned scan cancelled (no output files found)',
'recovery_type': 'orphan_cleanup'
}
@bp.route('', methods=['GET'])
@api_auth_required
def list_scans():
"""
List all scans with pagination.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 20, max: 100)
status: Filter by status (running, completed, failed)
Returns:
JSON response with scans list and pagination info
"""
try:
# Get and validate query parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
status_filter = request.args.get('status', None, type=str)
# Validate pagination params
page, per_page = validate_page_params(page, per_page)
# Get scans from service
scan_service = ScanService(current_app.db_session)
paginated_result = scan_service.list_scans(
page=page,
per_page=per_page,
status_filter=status_filter
)
logger.info(f"Listed scans: page={page}, per_page={per_page}, status={status_filter}, total={paginated_result.total}")
return jsonify({
'scans': paginated_result.items,
'total': paginated_result.total,
'page': paginated_result.page,
'per_page': paginated_result.per_page,
'total_pages': paginated_result.pages,
'has_prev': paginated_result.has_prev,
'has_next': paginated_result.has_next
})
except ValueError as e:
logger.warning(f"Invalid request parameters: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error listing scans: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error listing scans: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>', methods=['GET'])
@api_auth_required
def get_scan(scan_id):
"""
Get details for a specific scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan details
"""
try:
# Get scan from service
scan_service = ScanService(current_app.db_session)
scan = scan_service.get_scan(scan_id)
if not scan:
logger.warning(f"Scan not found: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
logger.info(f"Retrieved scan details: {scan_id}")
return jsonify(scan)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('', methods=['POST'])
@api_auth_required
def trigger_scan():
"""
Trigger a new scan.
Request body:
config_id: Database config ID (required)
Returns:
JSON response with scan_id and status
"""
try:
# Get request data
data = request.get_json() or {}
config_id = data.get('config_id')
# Validate required fields
if not config_id:
logger.warning("Scan trigger request missing config_id")
return jsonify({
'error': 'Invalid request',
'message': 'config_id is required'
}), 400
# Validate config_id is an integer
try:
config_id = int(config_id)
except (TypeError, ValueError):
logger.warning(f"Invalid config_id type: {config_id}")
return jsonify({
'error': 'Invalid request',
'message': 'config_id must be an integer'
}), 400
# Trigger scan via service
scan_service = ScanService(current_app.db_session)
scan_id = scan_service.trigger_scan(
config_id=config_id,
triggered_by='api',
scheduler=current_app.scheduler
)
logger.info(f"Scan {scan_id} triggered via API: config_id={config_id}")
return jsonify({
'scan_id': scan_id,
'status': 'running',
'message': 'Scan queued successfully'
}), 201
except ValueError as e:
# Config validation error
error_message = str(e)
logger.warning(f"Invalid config: {error_message}")
logger.warning(f"Request data: config_id='{config_id}'")
return jsonify({
'error': 'Invalid request',
'message': error_message
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error triggering scan: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to create scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error triggering scan: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>', methods=['DELETE'])
@api_auth_required
def delete_scan(scan_id):
"""
Delete a scan and its associated files.
Args:
scan_id: Scan ID to delete
Returns:
JSON response with deletion status
"""
try:
# Delete scan via service
scan_service = ScanService(current_app.db_session)
scan_service.delete_scan(scan_id)
logger.info(f"Scan {scan_id} deleted successfully")
return jsonify({
'scan_id': scan_id,
'message': 'Scan deleted successfully'
}), 200
except ValueError as e:
# Scan not found
logger.warning(f"Scan deletion failed: {str(e)}")
return jsonify({
'error': 'Not found',
'message': str(e)
}), 404
except SQLAlchemyError as e:
logger.error(f"Database error deleting scan {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to delete scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error deleting scan {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>/stop', methods=['POST'])
@api_auth_required
def stop_running_scan(scan_id):
"""
Stop a running scan with smart recovery for orphaned scans.
If the scan is actively running in the registry, sends a cancel signal.
If the scan shows as running/finalizing but is not in the registry (orphaned),
performs smart recovery: marks as 'completed' if output files exist,
otherwise marks as 'cancelled'.
Args:
scan_id: Scan ID to stop
Returns:
JSON response with stop status or recovery result
"""
try:
session = current_app.db_session
# Check if scan exists
scan = session.query(Scan).filter_by(id=scan_id).first()
if not scan:
logger.warning(f"Scan not found for stop request: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
# Allow stopping scans with status 'running' or 'finalizing'
if scan.status not in ('running', 'finalizing'):
logger.warning(f"Cannot stop scan {scan_id}: status is '{scan.status}'")
return jsonify({
'error': 'Invalid state',
'message': f"Cannot stop scan: status is '{scan.status}'"
}), 400
# Get database URL from app config
db_url = current_app.config['SQLALCHEMY_DATABASE_URI']
# Attempt to stop the scan
stopped = stop_scan(scan_id, db_url)
if stopped:
logger.info(f"Stop signal sent to scan {scan_id}")
return jsonify({
'scan_id': scan_id,
'message': 'Stop signal sent to scan',
'status': 'stopping'
}), 200
else:
# Scanner not in registry - this is an orphaned scan
# Attempt smart recovery
logger.warning(f"Scan {scan_id} not in registry, attempting smart recovery")
recovery_result = _recover_orphaned_scan(scan, session)
return jsonify(recovery_result), 200
except SQLAlchemyError as e:
logger.error(f"Database error stopping scan {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to stop scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error stopping scan {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>/status', methods=['GET'])
@api_auth_required
def get_scan_status(scan_id):
"""
Get current status of a running scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan status and progress
"""
try:
# Get scan status from service
scan_service = ScanService(current_app.db_session)
status = scan_service.get_scan_status(scan_id)
if not status:
logger.warning(f"Scan not found for status check: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
logger.debug(f"Retrieved status for scan {scan_id}: {status['status']}")
return jsonify(status)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan status {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan status'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan status {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>/progress', methods=['GET'])
@api_auth_required
def get_scan_progress(scan_id):
"""
Get detailed progress for a running scan including per-IP results.
Args:
scan_id: Scan ID
Returns:
JSON response with scan progress including:
- current_phase: Current scan phase
- total_ips: Total IPs being scanned
- completed_ips: Number of IPs completed in current phase
- progress_entries: List of per-IP progress with discovered results
"""
try:
session = current_app.db_session
# Get scan record
scan = session.query(Scan).filter_by(id=scan_id).first()
if not scan:
logger.warning(f"Scan not found for progress check: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
# Get progress entries
progress_entries = session.query(ScanProgress).filter_by(scan_id=scan_id).all()
# Build progress data
entries = []
for entry in progress_entries:
entry_data = {
'ip_address': entry.ip_address,
'site_name': entry.site_name,
'phase': entry.phase,
'status': entry.status,
'ping_result': entry.ping_result
}
# Parse JSON fields
if entry.tcp_ports:
entry_data['tcp_ports'] = json.loads(entry.tcp_ports)
else:
entry_data['tcp_ports'] = []
if entry.udp_ports:
entry_data['udp_ports'] = json.loads(entry.udp_ports)
else:
entry_data['udp_ports'] = []
if entry.services:
entry_data['services'] = json.loads(entry.services)
else:
entry_data['services'] = []
entries.append(entry_data)
# Sort entries by site name then IP (numerically)
def ip_sort_key(ip_str):
"""Convert IP to tuple of integers for proper numeric sorting."""
try:
return tuple(int(octet) for octet in ip_str.split('.'))
except (ValueError, AttributeError):
return (0, 0, 0, 0)
entries.sort(key=lambda x: (x['site_name'] or '', ip_sort_key(x['ip_address'])))
response = {
'scan_id': scan_id,
'status': scan.status,
'current_phase': scan.current_phase or 'pending',
'total_ips': scan.total_ips or 0,
'completed_ips': scan.completed_ips or 0,
'progress_entries': entries
}
logger.debug(f"Retrieved progress for scan {scan_id}: phase={scan.current_phase}, {scan.completed_ips}/{scan.total_ips} IPs")
return jsonify(response)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan progress {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan progress'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan progress {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/by-ip/<ip_address>', methods=['GET'])
@api_auth_required
def get_scans_by_ip(ip_address):
"""
Get last 10 scans containing a specific IP address.
Args:
ip_address: IP address to search for
Returns:
JSON response with list of scans containing the IP
"""
try:
# Get scans from service
scan_service = ScanService(current_app.db_session)
scans = scan_service.get_scans_by_ip(ip_address)
logger.info(f"Retrieved {len(scans)} scans for IP: {ip_address}")
return jsonify({
'ip_address': ip_address,
'scans': scans,
'count': len(scans)
})
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scans for IP {ip_address}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scans for IP {ip_address}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id1>/compare/<int:scan_id2>', methods=['GET'])
@api_auth_required
def compare_scans(scan_id1, scan_id2):
"""
Compare two scans and show differences.
Compares ports, services, and certificates between two scans,
highlighting added, removed, and changed items.
Args:
scan_id1: First (older) scan ID
scan_id2: Second (newer) scan ID
Returns:
JSON response with comparison results including:
- scan1, scan2: Metadata for both scans
- ports: Added, removed, and unchanged ports
- services: Added, removed, and changed services
- certificates: Added, removed, and changed certificates
- drift_score: Overall drift metric (0.0-1.0)
"""
try:
# Compare scans using service
scan_service = ScanService(current_app.db_session)
comparison = scan_service.compare_scans(scan_id1, scan_id2)
if not comparison:
logger.warning(f"Scan comparison failed: one or both scans not found ({scan_id1}, {scan_id2})")
return jsonify({
'error': 'Not found',
'message': 'One or both scans not found'
}), 404
logger.info(f"Compared scans {scan_id1} and {scan_id2}: drift_score={comparison['drift_score']}")
return jsonify(comparison), 200
except SQLAlchemyError as e:
logger.error(f"Database error comparing scans {scan_id1} and {scan_id2}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to compare scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error comparing scans {scan_id1} and {scan_id2}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'scans',
'version': '1.0.0-phase1'
})