Add POST /api/alerts/acknowledge-all endpoint to bulk acknowledge all unacknowledged alerts. Add "Ack All" button to alerts page header with confirmation dialog for quick dismissal of all pending alerts.
535 lines
17 KiB
Python
535 lines
17 KiB
Python
"""
|
|
Alerts API blueprint.
|
|
|
|
Handles endpoints for viewing alert history and managing alert rules.
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask import Blueprint, jsonify, request, current_app
|
|
|
|
from web.auth.decorators import api_auth_required
|
|
from web.models import Alert, AlertRule, Scan
|
|
from web.services.alert_service import AlertService
|
|
|
|
bp = Blueprint('alerts', __name__)
|
|
|
|
|
|
@bp.route('', methods=['GET'])
|
|
@api_auth_required
|
|
def list_alerts():
|
|
"""
|
|
List recent alerts.
|
|
|
|
Query params:
|
|
page: Page number (default: 1)
|
|
per_page: Items per page (default: 20)
|
|
alert_type: Filter by alert type
|
|
severity: Filter by severity (info, warning, critical)
|
|
acknowledged: Filter by acknowledgment status (true/false)
|
|
scan_id: Filter by specific scan
|
|
start_date: Filter alerts after this date (ISO format)
|
|
end_date: Filter alerts before this date (ISO format)
|
|
|
|
Returns:
|
|
JSON response with alerts list
|
|
"""
|
|
# Get query parameters
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = min(request.args.get('per_page', 20, type=int), 100) # Max 100 items
|
|
alert_type = request.args.get('alert_type')
|
|
severity = request.args.get('severity')
|
|
acknowledged = request.args.get('acknowledged')
|
|
scan_id = request.args.get('scan_id', type=int)
|
|
start_date = request.args.get('start_date')
|
|
end_date = request.args.get('end_date')
|
|
|
|
# Build query
|
|
query = current_app.db_session.query(Alert)
|
|
|
|
# Apply filters
|
|
if alert_type:
|
|
query = query.filter(Alert.alert_type == alert_type)
|
|
if severity:
|
|
query = query.filter(Alert.severity == severity)
|
|
if acknowledged is not None:
|
|
ack_bool = acknowledged.lower() == 'true'
|
|
query = query.filter(Alert.acknowledged == ack_bool)
|
|
if scan_id:
|
|
query = query.filter(Alert.scan_id == scan_id)
|
|
if start_date:
|
|
try:
|
|
start_dt = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
|
|
query = query.filter(Alert.created_at >= start_dt)
|
|
except ValueError:
|
|
pass # Ignore invalid date format
|
|
if end_date:
|
|
try:
|
|
end_dt = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
|
|
query = query.filter(Alert.created_at <= end_dt)
|
|
except ValueError:
|
|
pass # Ignore invalid date format
|
|
|
|
# Order by severity and date
|
|
query = query.order_by(
|
|
Alert.severity.desc(), # Critical first, then warning, then info
|
|
Alert.created_at.desc() # Most recent first
|
|
)
|
|
|
|
# Paginate
|
|
total = query.count()
|
|
alerts = query.offset((page - 1) * per_page).limit(per_page).all()
|
|
|
|
# Format response
|
|
alerts_data = []
|
|
for alert in alerts:
|
|
# Get scan info
|
|
scan = current_app.db_session.query(Scan).filter(Scan.id == alert.scan_id).first()
|
|
|
|
alerts_data.append({
|
|
'id': alert.id,
|
|
'scan_id': alert.scan_id,
|
|
'scan_title': scan.title if scan else None,
|
|
'rule_id': alert.rule_id,
|
|
'alert_type': alert.alert_type,
|
|
'severity': alert.severity,
|
|
'message': alert.message,
|
|
'ip_address': alert.ip_address,
|
|
'port': alert.port,
|
|
'acknowledged': alert.acknowledged,
|
|
'acknowledged_at': alert.acknowledged_at.isoformat() if alert.acknowledged_at else None,
|
|
'acknowledged_by': alert.acknowledged_by,
|
|
'email_sent': alert.email_sent,
|
|
'email_sent_at': alert.email_sent_at.isoformat() if alert.email_sent_at else None,
|
|
'webhook_sent': alert.webhook_sent,
|
|
'webhook_sent_at': alert.webhook_sent_at.isoformat() if alert.webhook_sent_at else None,
|
|
'created_at': alert.created_at.isoformat()
|
|
})
|
|
|
|
return jsonify({
|
|
'alerts': alerts_data,
|
|
'total': total,
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'pages': (total + per_page - 1) // per_page # Ceiling division
|
|
})
|
|
|
|
|
|
@bp.route('/<int:alert_id>/acknowledge', methods=['POST'])
|
|
@api_auth_required
|
|
def acknowledge_alert(alert_id):
|
|
"""
|
|
Acknowledge an alert.
|
|
|
|
Args:
|
|
alert_id: Alert ID to acknowledge
|
|
|
|
Returns:
|
|
JSON response with acknowledgment status
|
|
"""
|
|
# Get username from auth context or default to 'api'
|
|
acknowledged_by = request.json.get('acknowledged_by', 'api') if request.json else 'api'
|
|
|
|
alert_service = AlertService(current_app.db_session)
|
|
success = alert_service.acknowledge_alert(alert_id, acknowledged_by)
|
|
|
|
if success:
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f'Alert {alert_id} acknowledged',
|
|
'acknowledged_by': acknowledged_by
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Failed to acknowledge alert {alert_id}'
|
|
}), 400
|
|
|
|
|
|
@bp.route('/acknowledge-all', methods=['POST'])
|
|
@api_auth_required
|
|
def acknowledge_all_alerts():
|
|
"""
|
|
Acknowledge all unacknowledged alerts.
|
|
|
|
Returns:
|
|
JSON response with count of acknowledged alerts
|
|
"""
|
|
acknowledged_by = request.json.get('acknowledged_by', 'api') if request.json else 'api'
|
|
|
|
try:
|
|
# Get all unacknowledged alerts
|
|
unacked_alerts = current_app.db_session.query(Alert).filter(
|
|
Alert.acknowledged == False
|
|
).all()
|
|
|
|
count = 0
|
|
for alert in unacked_alerts:
|
|
alert.acknowledged = True
|
|
alert.acknowledged_at = datetime.now(timezone.utc)
|
|
alert.acknowledged_by = acknowledged_by
|
|
count += 1
|
|
|
|
current_app.db_session.commit()
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f'Acknowledged {count} alerts',
|
|
'count': count,
|
|
'acknowledged_by': acknowledged_by
|
|
})
|
|
|
|
except Exception as e:
|
|
current_app.db_session.rollback()
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Failed to acknowledge alerts: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@bp.route('/rules', methods=['GET'])
|
|
@api_auth_required
|
|
def list_alert_rules():
|
|
"""
|
|
List all alert rules.
|
|
|
|
Returns:
|
|
JSON response with alert rules
|
|
"""
|
|
rules = current_app.db_session.query(AlertRule).order_by(AlertRule.name, AlertRule.rule_type).all()
|
|
|
|
rules_data = []
|
|
for rule in rules:
|
|
rules_data.append({
|
|
'id': rule.id,
|
|
'name': rule.name,
|
|
'rule_type': rule.rule_type,
|
|
'enabled': rule.enabled,
|
|
'threshold': rule.threshold,
|
|
'email_enabled': rule.email_enabled,
|
|
'webhook_enabled': rule.webhook_enabled,
|
|
'severity': rule.severity,
|
|
'filter_conditions': json.loads(rule.filter_conditions) if rule.filter_conditions else None,
|
|
'config_id': rule.config_id,
|
|
'config_title': rule.config.title if rule.config else None,
|
|
'created_at': rule.created_at.isoformat(),
|
|
'updated_at': rule.updated_at.isoformat() if rule.updated_at else None
|
|
})
|
|
|
|
return jsonify({
|
|
'rules': rules_data,
|
|
'total': len(rules_data)
|
|
})
|
|
|
|
|
|
@bp.route('/rules', methods=['POST'])
|
|
@api_auth_required
|
|
def create_alert_rule():
|
|
"""
|
|
Create a new alert rule.
|
|
|
|
Request body:
|
|
name: User-friendly rule name
|
|
rule_type: Type of alert rule (unexpected_port, drift_detection, cert_expiry, weak_tls, ping_failed)
|
|
threshold: Threshold value (e.g., days for cert expiry, percentage for drift)
|
|
enabled: Whether rule is active (default: true)
|
|
email_enabled: Send email for this rule (default: false)
|
|
webhook_enabled: Send webhook for this rule (default: false)
|
|
severity: Alert severity (critical, warning, info)
|
|
filter_conditions: JSON object with filter conditions
|
|
config_id: Optional config ID to apply rule to
|
|
|
|
Returns:
|
|
JSON response with created rule
|
|
"""
|
|
data = request.get_json() or {}
|
|
|
|
# Validate required fields
|
|
if not data.get('rule_type'):
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'rule_type is required'
|
|
}), 400
|
|
|
|
# Valid rule types
|
|
valid_rule_types = ['unexpected_port', 'drift_detection', 'cert_expiry', 'weak_tls', 'ping_failed']
|
|
if data['rule_type'] not in valid_rule_types:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Invalid rule_type. Must be one of: {", ".join(valid_rule_types)}'
|
|
}), 400
|
|
|
|
# Valid severities
|
|
valid_severities = ['critical', 'warning', 'info']
|
|
if data.get('severity') and data['severity'] not in valid_severities:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Invalid severity. Must be one of: {", ".join(valid_severities)}'
|
|
}), 400
|
|
|
|
try:
|
|
# Validate config_id if provided
|
|
config_id = data.get('config_id')
|
|
if config_id:
|
|
from web.models import ScanConfig
|
|
config = current_app.db_session.query(ScanConfig).filter_by(id=config_id).first()
|
|
if not config:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Config with ID {config_id} not found'
|
|
}), 400
|
|
|
|
# Create new rule
|
|
rule = AlertRule(
|
|
name=data.get('name', f"{data['rule_type']} rule"),
|
|
rule_type=data['rule_type'],
|
|
enabled=data.get('enabled', True),
|
|
threshold=data.get('threshold'),
|
|
email_enabled=data.get('email_enabled', False),
|
|
webhook_enabled=data.get('webhook_enabled', False),
|
|
severity=data.get('severity', 'warning'),
|
|
filter_conditions=json.dumps(data['filter_conditions']) if data.get('filter_conditions') else None,
|
|
config_id=config_id,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
current_app.db_session.add(rule)
|
|
current_app.db_session.commit()
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': 'Alert rule created successfully',
|
|
'rule': {
|
|
'id': rule.id,
|
|
'name': rule.name,
|
|
'rule_type': rule.rule_type,
|
|
'enabled': rule.enabled,
|
|
'threshold': rule.threshold,
|
|
'email_enabled': rule.email_enabled,
|
|
'webhook_enabled': rule.webhook_enabled,
|
|
'severity': rule.severity,
|
|
'filter_conditions': json.loads(rule.filter_conditions) if rule.filter_conditions else None,
|
|
'config_id': rule.config_id,
|
|
'config_title': rule.config.title if rule.config else None,
|
|
'created_at': rule.created_at.isoformat(),
|
|
'updated_at': rule.updated_at.isoformat()
|
|
}
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
current_app.db_session.rollback()
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Failed to create alert rule: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@bp.route('/rules/<int:rule_id>', methods=['PUT'])
|
|
@api_auth_required
|
|
def update_alert_rule(rule_id):
|
|
"""
|
|
Update an existing alert rule.
|
|
|
|
Args:
|
|
rule_id: Alert rule ID to update
|
|
|
|
Request body:
|
|
name: User-friendly rule name (optional)
|
|
threshold: Threshold value (optional)
|
|
enabled: Whether rule is active (optional)
|
|
email_enabled: Send email for this rule (optional)
|
|
webhook_enabled: Send webhook for this rule (optional)
|
|
severity: Alert severity (optional)
|
|
filter_conditions: JSON object with filter conditions (optional)
|
|
config_id: Config ID to apply rule to (optional)
|
|
|
|
Returns:
|
|
JSON response with update status
|
|
"""
|
|
data = request.get_json() or {}
|
|
|
|
# Get existing rule
|
|
rule = current_app.db_session.query(AlertRule).filter(AlertRule.id == rule_id).first()
|
|
if not rule:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Alert rule {rule_id} not found'
|
|
}), 404
|
|
|
|
# Valid severities
|
|
valid_severities = ['critical', 'warning', 'info']
|
|
if data.get('severity') and data['severity'] not in valid_severities:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Invalid severity. Must be one of: {", ".join(valid_severities)}'
|
|
}), 400
|
|
|
|
try:
|
|
# Validate config_id if provided
|
|
if 'config_id' in data:
|
|
config_id = data['config_id']
|
|
if config_id:
|
|
from web.models import ScanConfig
|
|
config = current_app.db_session.query(ScanConfig).filter_by(id=config_id).first()
|
|
if not config:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Config with ID {config_id} not found'
|
|
}), 400
|
|
|
|
# Update fields if provided
|
|
if 'name' in data:
|
|
rule.name = data['name']
|
|
if 'threshold' in data:
|
|
rule.threshold = data['threshold']
|
|
if 'enabled' in data:
|
|
rule.enabled = data['enabled']
|
|
if 'email_enabled' in data:
|
|
rule.email_enabled = data['email_enabled']
|
|
if 'webhook_enabled' in data:
|
|
rule.webhook_enabled = data['webhook_enabled']
|
|
if 'severity' in data:
|
|
rule.severity = data['severity']
|
|
if 'filter_conditions' in data:
|
|
rule.filter_conditions = json.dumps(data['filter_conditions']) if data['filter_conditions'] else None
|
|
if 'config_id' in data:
|
|
rule.config_id = data['config_id']
|
|
|
|
rule.updated_at = datetime.now(timezone.utc)
|
|
|
|
current_app.db_session.commit()
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': 'Alert rule updated successfully',
|
|
'rule': {
|
|
'id': rule.id,
|
|
'name': rule.name,
|
|
'rule_type': rule.rule_type,
|
|
'enabled': rule.enabled,
|
|
'threshold': rule.threshold,
|
|
'email_enabled': rule.email_enabled,
|
|
'webhook_enabled': rule.webhook_enabled,
|
|
'severity': rule.severity,
|
|
'filter_conditions': json.loads(rule.filter_conditions) if rule.filter_conditions else None,
|
|
'config_id': rule.config_id,
|
|
'config_title': rule.config.title if rule.config else None,
|
|
'created_at': rule.created_at.isoformat(),
|
|
'updated_at': rule.updated_at.isoformat()
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
current_app.db_session.rollback()
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Failed to update alert rule: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@bp.route('/rules/<int:rule_id>', methods=['DELETE'])
|
|
@api_auth_required
|
|
def delete_alert_rule(rule_id):
|
|
"""
|
|
Delete an alert rule.
|
|
|
|
Args:
|
|
rule_id: Alert rule ID to delete
|
|
|
|
Returns:
|
|
JSON response with deletion status
|
|
"""
|
|
# Get existing rule
|
|
rule = current_app.db_session.query(AlertRule).filter(AlertRule.id == rule_id).first()
|
|
if not rule:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Alert rule {rule_id} not found'
|
|
}), 404
|
|
|
|
try:
|
|
# Delete the rule (cascade will delete related alerts)
|
|
current_app.db_session.delete(rule)
|
|
current_app.db_session.commit()
|
|
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f'Alert rule {rule_id} deleted successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
current_app.db_session.rollback()
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Failed to delete alert rule: {str(e)}'
|
|
}), 500
|
|
|
|
|
|
@bp.route('/stats', methods=['GET'])
|
|
@api_auth_required
|
|
def alert_stats():
|
|
"""
|
|
Get alert statistics.
|
|
|
|
Query params:
|
|
days: Number of days to look back (default: 7)
|
|
|
|
Returns:
|
|
JSON response with alert statistics
|
|
"""
|
|
days = request.args.get('days', 7, type=int)
|
|
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
|
|
|
|
# Get alerts in date range
|
|
alerts = current_app.db_session.query(Alert).filter(Alert.created_at >= cutoff_date).all()
|
|
|
|
# Calculate statistics
|
|
total_alerts = len(alerts)
|
|
alerts_by_severity = {'critical': 0, 'warning': 0, 'info': 0}
|
|
alerts_by_type = {}
|
|
unacknowledged_count = 0
|
|
|
|
for alert in alerts:
|
|
# Count by severity
|
|
if alert.severity in alerts_by_severity:
|
|
alerts_by_severity[alert.severity] += 1
|
|
|
|
# Count by type
|
|
if alert.alert_type not in alerts_by_type:
|
|
alerts_by_type[alert.alert_type] = 0
|
|
alerts_by_type[alert.alert_type] += 1
|
|
|
|
# Count unacknowledged
|
|
if not alert.acknowledged:
|
|
unacknowledged_count += 1
|
|
|
|
return jsonify({
|
|
'stats': {
|
|
'total_alerts': total_alerts,
|
|
'unacknowledged_count': unacknowledged_count,
|
|
'alerts_by_severity': alerts_by_severity,
|
|
'alerts_by_type': alerts_by_type,
|
|
'date_range': {
|
|
'start': cutoff_date.isoformat(),
|
|
'end': datetime.now(timezone.utc).isoformat(),
|
|
'days': days
|
|
}
|
|
}
|
|
})
|
|
|
|
|
|
# Health check endpoint
|
|
@bp.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""
|
|
Health check endpoint for monitoring.
|
|
|
|
Returns:
|
|
JSON response with API health status
|
|
"""
|
|
return jsonify({
|
|
'status': 'healthy',
|
|
'api': 'alerts',
|
|
'version': '1.0.0-phase5'
|
|
}) |