""" Webhook Service Module Handles webhook delivery for alert notifications with retry logic, authentication support, and comprehensive logging. """ import json import logging import time from datetime import datetime, timezone from typing import List, Dict, Optional, Any, Tuple from sqlalchemy.orm import Session import requests from requests.auth import HTTPBasicAuth from cryptography.fernet import Fernet import os from ..models import Webhook, WebhookDeliveryLog, Alert, AlertRule, Scan from .template_service import get_template_service from ..config import APP_NAME, APP_VERSION, REPO_URL logger = logging.getLogger(__name__) class WebhookService: """ Service for webhook delivery and management. Handles queuing webhook deliveries, executing HTTP requests with authentication, retry logic, and logging delivery attempts. """ def __init__(self, db_session: Session, encryption_key: Optional[bytes] = None): """ Initialize webhook service. Args: db_session: SQLAlchemy database session encryption_key: Fernet encryption key for auth_token encryption """ self.db = db_session self._encryption_key = encryption_key or self._get_encryption_key() self._cipher = Fernet(self._encryption_key) if self._encryption_key else None def _get_encryption_key(self) -> Optional[bytes]: """ Get encryption key from environment or database. Returns: Fernet encryption key or None if not available """ # Try environment variable first key_str = os.environ.get('SNEAKYSCANNER_ENCRYPTION_KEY') if key_str: return key_str.encode() # Try to get from settings (would need to query Setting table) # For now, generate a temporary key if none exists try: return Fernet.generate_key() except Exception as e: logger.warning(f"Could not generate encryption key: {e}") return None def _encrypt_value(self, value: str) -> str: """Encrypt a string value.""" if not self._cipher: return value # Return plain text if encryption not available return self._cipher.encrypt(value.encode()).decode() def _decrypt_value(self, encrypted_value: str) -> str: """Decrypt an encrypted string value.""" if not self._cipher: return encrypted_value # Return as-is if encryption not available try: return self._cipher.decrypt(encrypted_value.encode()).decode() except Exception as e: logger.error(f"Failed to decrypt value: {e}") return encrypted_value def get_matching_webhooks(self, alert: Alert) -> List[Webhook]: """ Get all enabled webhooks that match an alert's type and severity. Args: alert: Alert object to match against Returns: List of matching Webhook objects """ # Get all enabled webhooks webhooks = self.db.query(Webhook).filter(Webhook.enabled == True).all() matching_webhooks = [] for webhook in webhooks: # Check if webhook matches alert type filter if webhook.alert_types: try: alert_types = json.loads(webhook.alert_types) if alert.alert_type not in alert_types: continue # Skip if alert type doesn't match except json.JSONDecodeError: logger.warning(f"Invalid alert_types JSON for webhook {webhook.id}") continue # Check if webhook matches severity filter if webhook.severity_filter: try: severity_filter = json.loads(webhook.severity_filter) if alert.severity not in severity_filter: continue # Skip if severity doesn't match except json.JSONDecodeError: logger.warning(f"Invalid severity_filter JSON for webhook {webhook.id}") continue matching_webhooks.append(webhook) logger.info(f"Found {len(matching_webhooks)} matching webhooks for alert {alert.id}") return matching_webhooks def queue_webhook_delivery(self, webhook_id: int, alert_id: int, scheduler_service=None) -> bool: """ Queue a webhook delivery for async execution via APScheduler. Args: webhook_id: ID of webhook to deliver alert_id: ID of alert to send scheduler_service: SchedulerService instance (if None, deliver synchronously) Returns: True if queued successfully, False otherwise """ if scheduler_service and scheduler_service.scheduler: try: # Import here to avoid circular dependency from web.jobs.webhook_job import execute_webhook_delivery # Schedule immediate execution scheduler_service.scheduler.add_job( execute_webhook_delivery, args=[webhook_id, alert_id, scheduler_service.db_url], id=f"webhook_{webhook_id}_{alert_id}_{int(time.time())}", replace_existing=False ) logger.info(f"Queued webhook {webhook_id} for alert {alert_id}") return True except Exception as e: logger.error(f"Failed to queue webhook delivery: {e}") # Fall back to synchronous delivery return self.deliver_webhook(webhook_id, alert_id) else: # No scheduler available, deliver synchronously logger.info(f"No scheduler available, delivering webhook {webhook_id} synchronously") return self.deliver_webhook(webhook_id, alert_id) def deliver_webhook(self, webhook_id: int, alert_id: int, attempt_number: int = 1) -> bool: """ Deliver a webhook with retry logic. Args: webhook_id: ID of webhook to deliver alert_id: ID of alert to send attempt_number: Current attempt number (for retries) Returns: True if delivered successfully, False otherwise """ # Get webhook and alert webhook = self.db.query(Webhook).filter(Webhook.id == webhook_id).first() if not webhook: logger.error(f"Webhook {webhook_id} not found") return False alert = self.db.query(Alert).filter(Alert.id == alert_id).first() if not alert: logger.error(f"Alert {alert_id} not found") return False logger.info(f"Delivering webhook {webhook_id} for alert {alert_id} (attempt {attempt_number}/{webhook.retry_count})") # Build payload with template support payload, content_type = self._build_payload(webhook, alert) # Prepare headers headers = {'Content-Type': content_type} # Add custom headers if provided if webhook.custom_headers: try: custom_headers = json.loads(webhook.custom_headers) headers.update(custom_headers) except json.JSONDecodeError: logger.warning(f"Invalid custom_headers JSON for webhook {webhook_id}") # Prepare authentication auth = None if webhook.auth_type == 'bearer' and webhook.auth_token: decrypted_token = self._decrypt_value(webhook.auth_token) headers['Authorization'] = f'Bearer {decrypted_token}' elif webhook.auth_type == 'basic' and webhook.auth_token: # Expecting format: "username:password" decrypted_token = self._decrypt_value(webhook.auth_token) if ':' in decrypted_token: username, password = decrypted_token.split(':', 1) auth = HTTPBasicAuth(username, password) # Execute HTTP request try: timeout = webhook.timeout or 10 # Use appropriate parameter based on payload type if isinstance(payload, dict): # JSON payload response = requests.post( webhook.url, json=payload, headers=headers, auth=auth, timeout=timeout ) else: # Text payload response = requests.post( webhook.url, data=payload, headers=headers, auth=auth, timeout=timeout ) # Log delivery attempt log_entry = WebhookDeliveryLog( webhook_id=webhook_id, alert_id=alert_id, status='success' if response.status_code < 400 else 'failed', response_code=response.status_code, response_body=response.text[:1000], # Limit to 1000 chars error_message=None if response.status_code < 400 else f"HTTP {response.status_code}", attempt_number=attempt_number, delivered_at=datetime.now(timezone.utc) ) self.db.add(log_entry) # Update alert webhook status if successful if response.status_code < 400: alert.webhook_sent = True alert.webhook_sent_at = datetime.now(timezone.utc) logger.info(f"Webhook {webhook_id} delivered successfully (HTTP {response.status_code})") self.db.commit() return True else: # Failed but got a response logger.warning(f"Webhook {webhook_id} failed with HTTP {response.status_code}") self.db.commit() # Retry if attempts remaining if attempt_number < webhook.retry_count: delay = self._calculate_retry_delay(attempt_number) logger.info(f"Retrying webhook {webhook_id} in {delay} seconds") time.sleep(delay) return self.deliver_webhook(webhook_id, alert_id, attempt_number + 1) return False except requests.exceptions.Timeout: error_msg = f"Request timeout after {timeout} seconds" logger.error(f"Webhook {webhook_id} timeout: {error_msg}") self._log_delivery_failure(webhook_id, alert_id, error_msg, attempt_number) except requests.exceptions.ConnectionError as e: error_msg = f"Connection error: {str(e)}" logger.error(f"Webhook {webhook_id} connection error: {error_msg}") self._log_delivery_failure(webhook_id, alert_id, error_msg, attempt_number) except requests.exceptions.RequestException as e: error_msg = f"Request error: {str(e)}" logger.error(f"Webhook {webhook_id} request error: {error_msg}") self._log_delivery_failure(webhook_id, alert_id, error_msg, attempt_number) except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(f"Webhook {webhook_id} unexpected error: {error_msg}") self._log_delivery_failure(webhook_id, alert_id, error_msg, attempt_number) # Retry if attempts remaining if attempt_number < webhook.retry_count: delay = self._calculate_retry_delay(attempt_number) logger.info(f"Retrying webhook {webhook_id} in {delay} seconds") time.sleep(delay) return self.deliver_webhook(webhook_id, alert_id, attempt_number + 1) return False def _log_delivery_failure(self, webhook_id: int, alert_id: int, error_message: str, attempt_number: int): """Log a failed delivery attempt.""" log_entry = WebhookDeliveryLog( webhook_id=webhook_id, alert_id=alert_id, status='failed', response_code=None, response_body=None, error_message=error_message[:500], # Limit error message length attempt_number=attempt_number, delivered_at=datetime.now(timezone.utc) ) self.db.add(log_entry) self.db.commit() def _calculate_retry_delay(self, attempt_number: int) -> int: """ Calculate exponential backoff delay for retries. Args: attempt_number: Current attempt number Returns: Delay in seconds """ # Exponential backoff: 2^attempt seconds (2, 4, 8, 16...) return min(2 ** attempt_number, 60) # Cap at 60 seconds def _build_payload(self, webhook: Webhook, alert: Alert) -> Tuple[Any, str]: """ Build payload for webhook delivery using template if configured. Args: webhook: Webhook object with optional template configuration alert: Alert object Returns: Tuple of (payload, content_type): - payload: Rendered payload (dict for JSON, string for text) - content_type: Content-Type header value """ # Get related scan scan = self.db.query(Scan).filter(Scan.id == alert.scan_id).first() # Get related rule rule = self.db.query(AlertRule).filter(AlertRule.id == alert.rule_id).first() # If webhook has a custom template, use it if webhook.template: template_service = get_template_service() context = template_service.build_context( alert=alert, scan=scan, rule=rule, app_name=APP_NAME, app_version=APP_VERSION, app_url=REPO_URL ) rendered, error = template_service.render( webhook.template, context, webhook.template_format or 'json' ) if error: logger.error(f"Template rendering error for webhook {webhook.id}: {error}") # Fall back to default payload return self._build_default_payload(alert, scan, rule), 'application/json' # Determine content type if webhook.content_type_override: content_type = webhook.content_type_override elif webhook.template_format == 'text': content_type = 'text/plain' else: content_type = 'application/json' # For JSON format, parse the rendered string back to a dict # For text format, return as string if webhook.template_format == 'json': try: payload = json.loads(rendered) except json.JSONDecodeError: logger.error(f"Failed to parse rendered JSON template for webhook {webhook.id}") return self._build_default_payload(alert, scan, rule), 'application/json' else: payload = rendered return payload, content_type # No template - use default payload return self._build_default_payload(alert, scan, rule), 'application/json' def _build_default_payload(self, alert: Alert, scan: Optional[Scan], rule: Optional[AlertRule]) -> Dict[str, Any]: """ Build default JSON payload for webhook delivery. Args: alert: Alert object scan: Scan object (optional) rule: AlertRule object (optional) Returns: Dict containing alert details in generic JSON format """ payload = { "event": "alert.created", "alert": { "id": alert.id, "type": alert.alert_type, "severity": alert.severity, "message": alert.message, "ip_address": alert.ip_address, "port": alert.port, "acknowledged": alert.acknowledged, "created_at": alert.created_at.isoformat() if alert.created_at else None }, "scan": { "id": scan.id if scan else None, "title": scan.title if scan else None, "timestamp": scan.timestamp.isoformat() if scan and scan.timestamp else None, "status": scan.status if scan else None }, "rule": { "id": rule.id if rule else None, "name": rule.name if rule else None, "type": rule.rule_type if rule else None, "threshold": rule.threshold if rule else None } } return payload def test_webhook(self, webhook_id: int) -> Dict[str, Any]: """ Send a test payload to a webhook. Args: webhook_id: ID of webhook to test Returns: Dict with test result details """ webhook = self.db.query(Webhook).filter(Webhook.id == webhook_id).first() if not webhook: return { 'success': False, 'message': 'Webhook not found', 'status_code': None } # Build test payload - use template if configured if webhook.template: template_service = get_template_service() rendered, error = template_service.render_test_payload( webhook.template, webhook.template_format or 'json' ) if error: return { 'success': False, 'message': f'Template error: {error}', 'status_code': None } # Determine content type if webhook.content_type_override: content_type = webhook.content_type_override elif webhook.template_format == 'text': content_type = 'text/plain' else: content_type = 'application/json' # Parse JSON template if webhook.template_format == 'json': try: payload = json.loads(rendered) except json.JSONDecodeError: return { 'success': False, 'message': 'Template rendered invalid JSON', 'status_code': None } else: payload = rendered else: # Default test payload payload = { "event": "webhook.test", "message": "This is a test webhook from SneakyScanner", "timestamp": datetime.now(timezone.utc).isoformat(), "webhook": { "id": webhook.id, "name": webhook.name } } content_type = 'application/json' # Prepare headers headers = {'Content-Type': content_type} if webhook.custom_headers: try: custom_headers = json.loads(webhook.custom_headers) headers.update(custom_headers) except json.JSONDecodeError: pass # Prepare authentication auth = None if webhook.auth_type == 'bearer' and webhook.auth_token: decrypted_token = self._decrypt_value(webhook.auth_token) headers['Authorization'] = f'Bearer {decrypted_token}' elif webhook.auth_type == 'basic' and webhook.auth_token: decrypted_token = self._decrypt_value(webhook.auth_token) if ':' in decrypted_token: username, password = decrypted_token.split(':', 1) auth = HTTPBasicAuth(username, password) # Execute test request try: timeout = webhook.timeout or 10 # Use appropriate parameter based on payload type if isinstance(payload, dict): # JSON payload response = requests.post( webhook.url, json=payload, headers=headers, auth=auth, timeout=timeout ) else: # Text payload response = requests.post( webhook.url, data=payload, headers=headers, auth=auth, timeout=timeout ) return { 'success': response.status_code < 400, 'message': f'HTTP {response.status_code}', 'status_code': response.status_code, 'response_body': response.text[:500] } except requests.exceptions.Timeout: return { 'success': False, 'message': f'Request timeout after {timeout} seconds', 'status_code': None } except requests.exceptions.ConnectionError as e: return { 'success': False, 'message': f'Connection error: {str(e)}', 'status_code': None } except Exception as e: return { 'success': False, 'message': f'Error: {str(e)}', 'status_code': None }