""" Stats API blueprint. Handles endpoints for dashboard statistics, trending data, and analytics. """ import logging from datetime import datetime, timedelta from flask import Blueprint, current_app, jsonify, request from sqlalchemy import func, Date from sqlalchemy.exc import SQLAlchemyError from web.auth.decorators import api_auth_required from web.models import Scan bp = Blueprint('stats', __name__) logger = logging.getLogger(__name__) @bp.route('/scan-trend', methods=['GET']) @api_auth_required def scan_trend(): """ Get scan activity trend data for charts. Query params: days: Number of days to include (default: 30, max: 365) Returns: JSON response with labels and values arrays for Chart.js { "labels": ["2025-01-01", "2025-01-02", ...], "values": [5, 3, 7, 2, ...] } """ try: # Get and validate query parameters days = request.args.get('days', 30, type=int) # Validate days parameter if days < 1: return jsonify({'error': 'days parameter must be at least 1'}), 400 if days > 365: return jsonify({'error': 'days parameter cannot exceed 365'}), 400 # Calculate date range end_date = datetime.utcnow().date() start_date = end_date - timedelta(days=days - 1) # Query scan counts per day db_session = current_app.db_session scan_counts = ( db_session.query( func.date(Scan.timestamp).label('scan_date'), func.count(Scan.id).label('scan_count') ) .filter(func.date(Scan.timestamp) >= start_date) .filter(func.date(Scan.timestamp) <= end_date) .group_by(func.date(Scan.timestamp)) .order_by('scan_date') .all() ) # Create a dictionary of date -> count scan_dict = {str(row.scan_date): row.scan_count for row in scan_counts} # Generate all dates in range (fill missing dates with 0) labels = [] values = [] current_date = start_date while current_date <= end_date: date_str = str(current_date) labels.append(date_str) values.append(scan_dict.get(date_str, 0)) current_date += timedelta(days=1) return jsonify({ 'labels': labels, 'values': values, 'start_date': str(start_date), 'end_date': str(end_date), 'total_scans': sum(values) }), 200 except SQLAlchemyError as e: logger.error(f"Database error in scan_trend: {str(e)}") return jsonify({'error': 'Database error occurred'}), 500 except Exception as e: logger.error(f"Error in scan_trend: {str(e)}") return jsonify({'error': 'An error occurred'}), 500 @bp.route('/summary', methods=['GET']) @api_auth_required def summary(): """ Get dashboard summary statistics. Returns: JSON response with summary stats { "total_scans": 150, "completed_scans": 140, "failed_scans": 5, "running_scans": 5, "scans_today": 3, "scans_this_week": 15 } """ try: db_session = current_app.db_session # Get total counts by status total_scans = db_session.query(func.count(Scan.id)).scalar() or 0 completed_scans = db_session.query(func.count(Scan.id)).filter( Scan.status == 'completed' ).scalar() or 0 failed_scans = db_session.query(func.count(Scan.id)).filter( Scan.status == 'failed' ).scalar() or 0 running_scans = db_session.query(func.count(Scan.id)).filter( Scan.status == 'running' ).scalar() or 0 # Get scans today today = datetime.utcnow().date() scans_today = db_session.query(func.count(Scan.id)).filter( func.date(Scan.timestamp) == today ).scalar() or 0 # Get scans this week (last 7 days) week_ago = today - timedelta(days=6) scans_this_week = db_session.query(func.count(Scan.id)).filter( func.date(Scan.timestamp) >= week_ago ).scalar() or 0 return jsonify({ 'total_scans': total_scans, 'completed_scans': completed_scans, 'failed_scans': failed_scans, 'running_scans': running_scans, 'scans_today': scans_today, 'scans_this_week': scans_this_week }), 200 except SQLAlchemyError as e: logger.error(f"Database error in summary: {str(e)}") return jsonify({'error': 'Database error occurred'}), 500 except Exception as e: logger.error(f"Error in summary: {str(e)}") return jsonify({'error': 'An error occurred'}), 500