Files
SneakyScan/app/web/api/scans.py
Phillip Tarrant c592000c96 Add real-time scan progress tracking
- Add ScanProgress model and progress fields to Scan model
- Implement progress callback in scanner to report phase completion
- Update scan_job to write per-IP results to database during execution
- Add /api/scans/<id>/progress endpoint for progress polling
- Add progress section to scan detail page with live updates
- Progress table shows current phase, completion bar, and per-IP results
- Poll every 3 seconds during active scans
- Sort IPs numerically for proper ordering
- Add database migration for new tables/columns
2025-11-21 12:49:27 -06:00

485 lines
15 KiB
Python

"""
Scans API blueprint.
Handles endpoints for triggering scans, listing scan history, and retrieving
scan results.
"""
import json
import logging
from flask import Blueprint, current_app, jsonify, request
from sqlalchemy.exc import SQLAlchemyError
from web.auth.decorators import api_auth_required
from web.models import Scan, ScanProgress
from web.services.scan_service import ScanService
from web.utils.pagination import validate_page_params
bp = Blueprint('scans', __name__)
logger = logging.getLogger(__name__)
@bp.route('', methods=['GET'])
@api_auth_required
def list_scans():
"""
List all scans with pagination.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 20, max: 100)
status: Filter by status (running, completed, failed)
Returns:
JSON response with scans list and pagination info
"""
try:
# Get and validate query parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
status_filter = request.args.get('status', None, type=str)
# Validate pagination params
page, per_page = validate_page_params(page, per_page)
# Get scans from service
scan_service = ScanService(current_app.db_session)
paginated_result = scan_service.list_scans(
page=page,
per_page=per_page,
status_filter=status_filter
)
logger.info(f"Listed scans: page={page}, per_page={per_page}, status={status_filter}, total={paginated_result.total}")
return jsonify({
'scans': paginated_result.items,
'total': paginated_result.total,
'page': paginated_result.page,
'per_page': paginated_result.per_page,
'total_pages': paginated_result.pages,
'has_prev': paginated_result.has_prev,
'has_next': paginated_result.has_next
})
except ValueError as e:
logger.warning(f"Invalid request parameters: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error listing scans: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error listing scans: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>', methods=['GET'])
@api_auth_required
def get_scan(scan_id):
"""
Get details for a specific scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan details
"""
try:
# Get scan from service
scan_service = ScanService(current_app.db_session)
scan = scan_service.get_scan(scan_id)
if not scan:
logger.warning(f"Scan not found: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
logger.info(f"Retrieved scan details: {scan_id}")
return jsonify(scan)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('', methods=['POST'])
@api_auth_required
def trigger_scan():
"""
Trigger a new scan.
Request body:
config_id: Database config ID (required)
Returns:
JSON response with scan_id and status
"""
try:
# Get request data
data = request.get_json() or {}
config_id = data.get('config_id')
# Validate required fields
if not config_id:
logger.warning("Scan trigger request missing config_id")
return jsonify({
'error': 'Invalid request',
'message': 'config_id is required'
}), 400
# Validate config_id is an integer
try:
config_id = int(config_id)
except (TypeError, ValueError):
logger.warning(f"Invalid config_id type: {config_id}")
return jsonify({
'error': 'Invalid request',
'message': 'config_id must be an integer'
}), 400
# Trigger scan via service
scan_service = ScanService(current_app.db_session)
scan_id = scan_service.trigger_scan(
config_id=config_id,
triggered_by='api',
scheduler=current_app.scheduler
)
logger.info(f"Scan {scan_id} triggered via API: config_id={config_id}")
return jsonify({
'scan_id': scan_id,
'status': 'running',
'message': 'Scan queued successfully'
}), 201
except ValueError as e:
# Config validation error
error_message = str(e)
logger.warning(f"Invalid config: {error_message}")
logger.warning(f"Request data: config_id='{config_id}'")
return jsonify({
'error': 'Invalid request',
'message': error_message
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error triggering scan: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to create scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error triggering scan: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>', methods=['DELETE'])
@api_auth_required
def delete_scan(scan_id):
"""
Delete a scan and its associated files.
Args:
scan_id: Scan ID to delete
Returns:
JSON response with deletion status
"""
try:
# Delete scan via service
scan_service = ScanService(current_app.db_session)
scan_service.delete_scan(scan_id)
logger.info(f"Scan {scan_id} deleted successfully")
return jsonify({
'scan_id': scan_id,
'message': 'Scan deleted successfully'
}), 200
except ValueError as e:
# Scan not found
logger.warning(f"Scan deletion failed: {str(e)}")
return jsonify({
'error': 'Not found',
'message': str(e)
}), 404
except SQLAlchemyError as e:
logger.error(f"Database error deleting scan {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to delete scan'
}), 500
except Exception as e:
logger.error(f"Unexpected error deleting scan {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>/status', methods=['GET'])
@api_auth_required
def get_scan_status(scan_id):
"""
Get current status of a running scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan status and progress
"""
try:
# Get scan status from service
scan_service = ScanService(current_app.db_session)
status = scan_service.get_scan_status(scan_id)
if not status:
logger.warning(f"Scan not found for status check: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
logger.debug(f"Retrieved status for scan {scan_id}: {status['status']}")
return jsonify(status)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan status {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan status'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan status {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id>/progress', methods=['GET'])
@api_auth_required
def get_scan_progress(scan_id):
"""
Get detailed progress for a running scan including per-IP results.
Args:
scan_id: Scan ID
Returns:
JSON response with scan progress including:
- current_phase: Current scan phase
- total_ips: Total IPs being scanned
- completed_ips: Number of IPs completed in current phase
- progress_entries: List of per-IP progress with discovered results
"""
try:
session = current_app.db_session
# Get scan record
scan = session.query(Scan).filter_by(id=scan_id).first()
if not scan:
logger.warning(f"Scan not found for progress check: {scan_id}")
return jsonify({
'error': 'Not found',
'message': f'Scan with ID {scan_id} not found'
}), 404
# Get progress entries
progress_entries = session.query(ScanProgress).filter_by(scan_id=scan_id).all()
# Build progress data
entries = []
for entry in progress_entries:
entry_data = {
'ip_address': entry.ip_address,
'site_name': entry.site_name,
'phase': entry.phase,
'status': entry.status,
'ping_result': entry.ping_result
}
# Parse JSON fields
if entry.tcp_ports:
entry_data['tcp_ports'] = json.loads(entry.tcp_ports)
else:
entry_data['tcp_ports'] = []
if entry.udp_ports:
entry_data['udp_ports'] = json.loads(entry.udp_ports)
else:
entry_data['udp_ports'] = []
if entry.services:
entry_data['services'] = json.loads(entry.services)
else:
entry_data['services'] = []
entries.append(entry_data)
# Sort entries by site name then IP (numerically)
def ip_sort_key(ip_str):
"""Convert IP to tuple of integers for proper numeric sorting."""
try:
return tuple(int(octet) for octet in ip_str.split('.'))
except (ValueError, AttributeError):
return (0, 0, 0, 0)
entries.sort(key=lambda x: (x['site_name'] or '', ip_sort_key(x['ip_address'])))
response = {
'scan_id': scan_id,
'status': scan.status,
'current_phase': scan.current_phase or 'pending',
'total_ips': scan.total_ips or 0,
'completed_ips': scan.completed_ips or 0,
'progress_entries': entries
}
logger.debug(f"Retrieved progress for scan {scan_id}: phase={scan.current_phase}, {scan.completed_ips}/{scan.total_ips} IPs")
return jsonify(response)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scan progress {scan_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scan progress'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scan progress {scan_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/by-ip/<ip_address>', methods=['GET'])
@api_auth_required
def get_scans_by_ip(ip_address):
"""
Get last 10 scans containing a specific IP address.
Args:
ip_address: IP address to search for
Returns:
JSON response with list of scans containing the IP
"""
try:
# Get scans from service
scan_service = ScanService(current_app.db_session)
scans = scan_service.get_scans_by_ip(ip_address)
logger.info(f"Retrieved {len(scans)} scans for IP: {ip_address}")
return jsonify({
'ip_address': ip_address,
'scans': scans,
'count': len(scans)
})
except SQLAlchemyError as e:
logger.error(f"Database error retrieving scans for IP {ip_address}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving scans for IP {ip_address}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:scan_id1>/compare/<int:scan_id2>', methods=['GET'])
@api_auth_required
def compare_scans(scan_id1, scan_id2):
"""
Compare two scans and show differences.
Compares ports, services, and certificates between two scans,
highlighting added, removed, and changed items.
Args:
scan_id1: First (older) scan ID
scan_id2: Second (newer) scan ID
Returns:
JSON response with comparison results including:
- scan1, scan2: Metadata for both scans
- ports: Added, removed, and unchanged ports
- services: Added, removed, and changed services
- certificates: Added, removed, and changed certificates
- drift_score: Overall drift metric (0.0-1.0)
"""
try:
# Compare scans using service
scan_service = ScanService(current_app.db_session)
comparison = scan_service.compare_scans(scan_id1, scan_id2)
if not comparison:
logger.warning(f"Scan comparison failed: one or both scans not found ({scan_id1}, {scan_id2})")
return jsonify({
'error': 'Not found',
'message': 'One or both scans not found'
}), 404
logger.info(f"Compared scans {scan_id1} and {scan_id2}: drift_score={comparison['drift_score']}")
return jsonify(comparison), 200
except SQLAlchemyError as e:
logger.error(f"Database error comparing scans {scan_id1} and {scan_id2}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to compare scans'
}), 500
except Exception as e:
logger.error(f"Unexpected error comparing scans {scan_id1} and {scan_id2}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'scans',
'version': '1.0.0-phase1'
})