Files
SneakyScan/app/web/api/stats.py

259 lines
8.0 KiB
Python

"""
Stats API blueprint.
Handles endpoints for dashboard statistics, trending data, and analytics.
"""
import logging
from datetime import datetime, timedelta
from flask import Blueprint, current_app, jsonify, request
from sqlalchemy import func, Date
from sqlalchemy.exc import SQLAlchemyError
from web.auth.decorators import api_auth_required
from web.models import Scan
bp = Blueprint('stats', __name__)
logger = logging.getLogger(__name__)
@bp.route('/scan-trend', methods=['GET'])
@api_auth_required
def scan_trend():
"""
Get scan activity trend data for charts.
Query params:
days: Number of days to include (default: 30, max: 365)
Returns:
JSON response with labels and values arrays for Chart.js
{
"labels": ["2025-01-01", "2025-01-02", ...],
"values": [5, 3, 7, 2, ...]
}
"""
try:
# Get and validate query parameters
days = request.args.get('days', 30, type=int)
# Validate days parameter
if days < 1:
return jsonify({'error': 'days parameter must be at least 1'}), 400
if days > 365:
return jsonify({'error': 'days parameter cannot exceed 365'}), 400
# Calculate date range
end_date = datetime.utcnow().date()
start_date = end_date - timedelta(days=days - 1)
# Query scan counts per day
db_session = current_app.db_session
scan_counts = (
db_session.query(
func.date(Scan.timestamp).label('scan_date'),
func.count(Scan.id).label('scan_count')
)
.filter(func.date(Scan.timestamp) >= start_date)
.filter(func.date(Scan.timestamp) <= end_date)
.group_by(func.date(Scan.timestamp))
.order_by('scan_date')
.all()
)
# Create a dictionary of date -> count
scan_dict = {str(row.scan_date): row.scan_count for row in scan_counts}
# Generate all dates in range (fill missing dates with 0)
labels = []
values = []
current_date = start_date
while current_date <= end_date:
date_str = str(current_date)
labels.append(date_str)
values.append(scan_dict.get(date_str, 0))
current_date += timedelta(days=1)
return jsonify({
'labels': labels,
'values': values,
'start_date': str(start_date),
'end_date': str(end_date),
'total_scans': sum(values)
}), 200
except SQLAlchemyError as e:
logger.error(f"Database error in scan_trend: {str(e)}")
return jsonify({'error': 'Database error occurred'}), 500
except Exception as e:
logger.error(f"Error in scan_trend: {str(e)}")
return jsonify({'error': 'An error occurred'}), 500
@bp.route('/summary', methods=['GET'])
@api_auth_required
def summary():
"""
Get dashboard summary statistics.
Returns:
JSON response with summary stats
{
"total_scans": 150,
"completed_scans": 140,
"failed_scans": 5,
"running_scans": 5,
"scans_today": 3,
"scans_this_week": 15
}
"""
try:
db_session = current_app.db_session
# Get total counts by status
total_scans = db_session.query(func.count(Scan.id)).scalar() or 0
completed_scans = db_session.query(func.count(Scan.id)).filter(
Scan.status == 'completed'
).scalar() or 0
failed_scans = db_session.query(func.count(Scan.id)).filter(
Scan.status == 'failed'
).scalar() or 0
running_scans = db_session.query(func.count(Scan.id)).filter(
Scan.status == 'running'
).scalar() or 0
# Get scans today
today = datetime.utcnow().date()
scans_today = db_session.query(func.count(Scan.id)).filter(
func.date(Scan.timestamp) == today
).scalar() or 0
# Get scans this week (last 7 days)
week_ago = today - timedelta(days=6)
scans_this_week = db_session.query(func.count(Scan.id)).filter(
func.date(Scan.timestamp) >= week_ago
).scalar() or 0
return jsonify({
'total_scans': total_scans,
'completed_scans': completed_scans,
'failed_scans': failed_scans,
'running_scans': running_scans,
'scans_today': scans_today,
'scans_this_week': scans_this_week
}), 200
except SQLAlchemyError as e:
logger.error(f"Database error in summary: {str(e)}")
return jsonify({'error': 'Database error occurred'}), 500
except Exception as e:
logger.error(f"Error in summary: {str(e)}")
return jsonify({'error': 'An error occurred'}), 500
@bp.route('/scan-history/<int:scan_id>', methods=['GET'])
@api_auth_required
def scan_history(scan_id):
"""
Get historical trend data for scans with the same config file.
Returns port counts and other metrics over time for the same
configuration/target as the specified scan.
Args:
scan_id: Reference scan ID
Query params:
limit: Maximum number of historical scans to include (default: 10, max: 50)
Returns:
JSON response with historical scan data
{
"scans": [
{
"id": 123,
"timestamp": "2025-01-01T12:00:00",
"title": "Scan title",
"port_count": 25,
"ip_count": 5
},
...
],
"labels": ["2025-01-01", ...],
"port_counts": [25, 26, 24, ...]
}
"""
try:
# Get query parameters
limit = request.args.get('limit', 10, type=int)
if limit > 50:
limit = 50
db_session = current_app.db_session
# Get the reference scan to find its config file
from web.models import ScanPort
reference_scan = db_session.query(Scan).filter(Scan.id == scan_id).first()
if not reference_scan:
return jsonify({'error': 'Scan not found'}), 404
config_id = reference_scan.config_id
# Query historical scans with the same config_id
historical_scans = (
db_session.query(Scan)
.filter(Scan.config_id == config_id)
.filter(Scan.status == 'completed')
.order_by(Scan.timestamp.desc())
.limit(limit)
.all()
)
# Build result data
scans_data = []
labels = []
port_counts = []
for scan in reversed(historical_scans): # Reverse to get chronological order
# Count ports for this scan
port_count = (
db_session.query(func.count(ScanPort.id))
.filter(ScanPort.scan_id == scan.id)
.scalar() or 0
)
# Count unique IPs for this scan
from web.models import ScanIP
ip_count = (
db_session.query(func.count(ScanIP.id))
.filter(ScanIP.scan_id == scan.id)
.scalar() or 0
)
scans_data.append({
'id': scan.id,
'timestamp': scan.timestamp.isoformat() if scan.timestamp else None,
'title': scan.title,
'port_count': port_count,
'ip_count': ip_count
})
# For chart data
labels.append(scan.timestamp.strftime('%Y-%m-%d %H:%M') if scan.timestamp else '')
port_counts.append(port_count)
return jsonify({
'scans': scans_data,
'labels': labels,
'port_counts': port_counts,
'config_id': config_id
}), 200
except SQLAlchemyError as e:
logger.error(f"Database error in scan_history: {str(e)}")
return jsonify({'error': 'Database error occurred'}), 500
except Exception as e:
logger.error(f"Error in scan_history: {str(e)}")
return jsonify({'error': 'An error occurred'}), 500