Implemented comprehensive scan comparison functionality with historical analysis and improved user experience for scan triggering. Features Added: - Scan comparison engine with ports, services, and certificates analysis - Drift score calculation (0.0-1.0 scale) for infrastructure changes - Side-by-side comparison UI with color-coded changes (added/removed/changed) - Historical trend charts showing port counts over time - "Compare with Previous" button on scan detail pages - Scan history API endpoint for trending data API Endpoints: - GET /api/scans/<id1>/compare/<id2> - Compare two scans - GET /api/stats/scan-history/<id> - Historical scan data for charts UI Improvements: - Replaced config file text inputs with dropdown selectors - Added config file selection to dashboard and scans pages - Improved delete scan confirmation with proper async handling - Enhanced error messages with detailed validation feedback - Added 2-second delay before redirect to ensure deletion completes Comparison Features: - Port changes: tracks added, removed, and unchanged ports - Service changes: detects version updates and service modifications - Certificate changes: monitors SSL/TLS certificate updates - Interactive historical charts with clickable data points - Automatic detection of previous scan for comparison Bug Fixes: - Fixed scan deletion UI alert appearing on successful deletion - Prevented config file path duplication (configs/configs/...) - Improved error handling for failed API responses - Added proper JSON response parsing with fallback handling Testing: - Created comprehensive test suite for comparison functionality - Tests cover comparison API, service methods, and drift scoring - Added edge case tests for identical scans and missing data
259 lines
8.0 KiB
Python
259 lines
8.0 KiB
Python
"""
|
|
Stats API blueprint.
|
|
|
|
Handles endpoints for dashboard statistics, trending data, and analytics.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, current_app, jsonify, request
|
|
from sqlalchemy import func, Date
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from web.auth.decorators import api_auth_required
|
|
from web.models import Scan
|
|
|
|
bp = Blueprint('stats', __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@bp.route('/scan-trend', methods=['GET'])
|
|
@api_auth_required
|
|
def scan_trend():
|
|
"""
|
|
Get scan activity trend data for charts.
|
|
|
|
Query params:
|
|
days: Number of days to include (default: 30, max: 365)
|
|
|
|
Returns:
|
|
JSON response with labels and values arrays for Chart.js
|
|
{
|
|
"labels": ["2025-01-01", "2025-01-02", ...],
|
|
"values": [5, 3, 7, 2, ...]
|
|
}
|
|
"""
|
|
try:
|
|
# Get and validate query parameters
|
|
days = request.args.get('days', 30, type=int)
|
|
|
|
# Validate days parameter
|
|
if days < 1:
|
|
return jsonify({'error': 'days parameter must be at least 1'}), 400
|
|
if days > 365:
|
|
return jsonify({'error': 'days parameter cannot exceed 365'}), 400
|
|
|
|
# Calculate date range
|
|
end_date = datetime.utcnow().date()
|
|
start_date = end_date - timedelta(days=days - 1)
|
|
|
|
# Query scan counts per day
|
|
db_session = current_app.db_session
|
|
scan_counts = (
|
|
db_session.query(
|
|
func.date(Scan.timestamp).label('scan_date'),
|
|
func.count(Scan.id).label('scan_count')
|
|
)
|
|
.filter(func.date(Scan.timestamp) >= start_date)
|
|
.filter(func.date(Scan.timestamp) <= end_date)
|
|
.group_by(func.date(Scan.timestamp))
|
|
.order_by('scan_date')
|
|
.all()
|
|
)
|
|
|
|
# Create a dictionary of date -> count
|
|
scan_dict = {str(row.scan_date): row.scan_count for row in scan_counts}
|
|
|
|
# Generate all dates in range (fill missing dates with 0)
|
|
labels = []
|
|
values = []
|
|
current_date = start_date
|
|
while current_date <= end_date:
|
|
date_str = str(current_date)
|
|
labels.append(date_str)
|
|
values.append(scan_dict.get(date_str, 0))
|
|
current_date += timedelta(days=1)
|
|
|
|
return jsonify({
|
|
'labels': labels,
|
|
'values': values,
|
|
'start_date': str(start_date),
|
|
'end_date': str(end_date),
|
|
'total_scans': sum(values)
|
|
}), 200
|
|
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Database error in scan_trend: {str(e)}")
|
|
return jsonify({'error': 'Database error occurred'}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error in scan_trend: {str(e)}")
|
|
return jsonify({'error': 'An error occurred'}), 500
|
|
|
|
|
|
@bp.route('/summary', methods=['GET'])
|
|
@api_auth_required
|
|
def summary():
|
|
"""
|
|
Get dashboard summary statistics.
|
|
|
|
Returns:
|
|
JSON response with summary stats
|
|
{
|
|
"total_scans": 150,
|
|
"completed_scans": 140,
|
|
"failed_scans": 5,
|
|
"running_scans": 5,
|
|
"scans_today": 3,
|
|
"scans_this_week": 15
|
|
}
|
|
"""
|
|
try:
|
|
db_session = current_app.db_session
|
|
|
|
# Get total counts by status
|
|
total_scans = db_session.query(func.count(Scan.id)).scalar() or 0
|
|
completed_scans = db_session.query(func.count(Scan.id)).filter(
|
|
Scan.status == 'completed'
|
|
).scalar() or 0
|
|
failed_scans = db_session.query(func.count(Scan.id)).filter(
|
|
Scan.status == 'failed'
|
|
).scalar() or 0
|
|
running_scans = db_session.query(func.count(Scan.id)).filter(
|
|
Scan.status == 'running'
|
|
).scalar() or 0
|
|
|
|
# Get scans today
|
|
today = datetime.utcnow().date()
|
|
scans_today = db_session.query(func.count(Scan.id)).filter(
|
|
func.date(Scan.timestamp) == today
|
|
).scalar() or 0
|
|
|
|
# Get scans this week (last 7 days)
|
|
week_ago = today - timedelta(days=6)
|
|
scans_this_week = db_session.query(func.count(Scan.id)).filter(
|
|
func.date(Scan.timestamp) >= week_ago
|
|
).scalar() or 0
|
|
|
|
return jsonify({
|
|
'total_scans': total_scans,
|
|
'completed_scans': completed_scans,
|
|
'failed_scans': failed_scans,
|
|
'running_scans': running_scans,
|
|
'scans_today': scans_today,
|
|
'scans_this_week': scans_this_week
|
|
}), 200
|
|
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Database error in summary: {str(e)}")
|
|
return jsonify({'error': 'Database error occurred'}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error in summary: {str(e)}")
|
|
return jsonify({'error': 'An error occurred'}), 500
|
|
|
|
|
|
@bp.route('/scan-history/<int:scan_id>', methods=['GET'])
|
|
@api_auth_required
|
|
def scan_history(scan_id):
|
|
"""
|
|
Get historical trend data for scans with the same config file.
|
|
|
|
Returns port counts and other metrics over time for the same
|
|
configuration/target as the specified scan.
|
|
|
|
Args:
|
|
scan_id: Reference scan ID
|
|
|
|
Query params:
|
|
limit: Maximum number of historical scans to include (default: 10, max: 50)
|
|
|
|
Returns:
|
|
JSON response with historical scan data
|
|
{
|
|
"scans": [
|
|
{
|
|
"id": 123,
|
|
"timestamp": "2025-01-01T12:00:00",
|
|
"title": "Scan title",
|
|
"port_count": 25,
|
|
"ip_count": 5
|
|
},
|
|
...
|
|
],
|
|
"labels": ["2025-01-01", ...],
|
|
"port_counts": [25, 26, 24, ...]
|
|
}
|
|
"""
|
|
try:
|
|
# Get query parameters
|
|
limit = request.args.get('limit', 10, type=int)
|
|
if limit > 50:
|
|
limit = 50
|
|
|
|
db_session = current_app.db_session
|
|
|
|
# Get the reference scan to find its config file
|
|
from web.models import ScanPort
|
|
reference_scan = db_session.query(Scan).filter(Scan.id == scan_id).first()
|
|
|
|
if not reference_scan:
|
|
return jsonify({'error': 'Scan not found'}), 404
|
|
|
|
config_file = reference_scan.config_file
|
|
|
|
# Query historical scans with the same config file
|
|
historical_scans = (
|
|
db_session.query(Scan)
|
|
.filter(Scan.config_file == config_file)
|
|
.filter(Scan.status == 'completed')
|
|
.order_by(Scan.timestamp.desc())
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
# Build result data
|
|
scans_data = []
|
|
labels = []
|
|
port_counts = []
|
|
|
|
for scan in reversed(historical_scans): # Reverse to get chronological order
|
|
# Count ports for this scan
|
|
port_count = (
|
|
db_session.query(func.count(ScanPort.id))
|
|
.filter(ScanPort.scan_id == scan.id)
|
|
.scalar() or 0
|
|
)
|
|
|
|
# Count unique IPs for this scan
|
|
from web.models import ScanIP
|
|
ip_count = (
|
|
db_session.query(func.count(ScanIP.id))
|
|
.filter(ScanIP.scan_id == scan.id)
|
|
.scalar() or 0
|
|
)
|
|
|
|
scans_data.append({
|
|
'id': scan.id,
|
|
'timestamp': scan.timestamp.isoformat() if scan.timestamp else None,
|
|
'title': scan.title,
|
|
'port_count': port_count,
|
|
'ip_count': ip_count
|
|
})
|
|
|
|
# For chart data
|
|
labels.append(scan.timestamp.strftime('%Y-%m-%d %H:%M') if scan.timestamp else '')
|
|
port_counts.append(port_count)
|
|
|
|
return jsonify({
|
|
'scans': scans_data,
|
|
'labels': labels,
|
|
'port_counts': port_counts,
|
|
'config_file': config_file
|
|
}), 200
|
|
|
|
except SQLAlchemyError as e:
|
|
logger.error(f"Database error in scan_history: {str(e)}")
|
|
return jsonify({'error': 'Database error occurred'}), 500
|
|
except Exception as e:
|
|
logger.error(f"Error in scan_history: {str(e)}")
|
|
return jsonify({'error': 'An error occurred'}), 500
|