"""Notification service for sending alerts via ntfy.""" from dataclasses import dataclass from typing import Optional, Union from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert from app.utils.http_client import HttpClient from app.utils.logging_config import get_logger # Type alias for alerts that can be sent SendableAlert = Union[TriggeredAlert, AggregatedAlert] @dataclass class NotificationResult: """Result of sending a notification.""" alert: SendableAlert success: bool error: Optional[str] = None class NotificationServiceError(Exception): """Raised when notification service encounters an error.""" pass class NotificationService: """Service for sending notifications via ntfy.""" # Map alert types to emoji tags ALERT_TYPE_TAGS: dict[AlertType, list[str]] = { AlertType.TEMPERATURE_LOW: ["cold_face", "thermometer"], AlertType.TEMPERATURE_HIGH: ["hot_face", "thermometer"], AlertType.PRECIPITATION: ["cloud_with_rain", "umbrella"], AlertType.WIND_SPEED: ["wind_face", "dash"], AlertType.WIND_GUST: ["tornado", "dash"], AlertType.SEVERE_WEATHER: ["rotating_light", "warning"], } def __init__( self, server_url: str, topic: str, access_token: str = "", priority: str = "high", default_tags: Optional[list[str]] = None, http_client: Optional[HttpClient] = None, ) -> None: """Initialize the notification service. Args: server_url: The ntfy server URL. topic: The topic to publish to. access_token: Optional bearer token for authentication. priority: Default notification priority. default_tags: Default tags to include with notifications. http_client: Optional HTTP client instance. """ self.server_url = server_url.rstrip("/") self.topic = topic self.access_token = access_token self.priority = priority self.default_tags = default_tags or ["cloud", "warning"] self.http_client = http_client or HttpClient() self.logger = get_logger(__name__) def send(self, alert: SendableAlert) -> NotificationResult: """Send a single alert notification. Args: alert: The triggered or aggregated alert to send. Returns: NotificationResult indicating success or failure. """ url = f"{self.server_url}/{self.topic}" # Build headers headers = { "Title": alert.title, "Priority": self._get_priority(alert), "Tags": ",".join(self._get_tags(alert)), } if self.access_token: headers["Authorization"] = f"Bearer {self.access_token}" self.logger.debug( "sending_notification", alert_type=alert.alert_type.value, title=alert.title, ) response = self.http_client.post( url, data=alert.message.encode("utf-8"), headers=headers, ) if response.success: self.logger.info( "notification_sent", alert_type=alert.alert_type.value, dedup_key=alert.dedup_key, ) return NotificationResult(alert=alert, success=True) else: error_msg = f"HTTP {response.status_code}: {response.text[:100]}" self.logger.error( "notification_failed", alert_type=alert.alert_type.value, error=error_msg, ) return NotificationResult(alert=alert, success=False, error=error_msg) def send_batch( self, alerts: list[SendableAlert], ) -> list[NotificationResult]: """Send multiple alert notifications. Args: alerts: List of triggered or aggregated alerts to send. Returns: List of NotificationResult for each alert. """ if not alerts: self.logger.info("no_alerts_to_send") return [] results: list[NotificationResult] = [] for alert in alerts: result = self.send(alert) results.append(result) success_count = sum(1 for r in results if r.success) self.logger.info( "batch_send_complete", total=len(alerts), success=success_count, failed=len(alerts) - success_count, ) return results def _get_priority(self, alert: SendableAlert) -> str: """Determine notification priority based on alert type. Args: alert: The triggered or aggregated alert. Returns: Priority string for ntfy. """ # Severe weather always gets urgent priority if alert.alert_type == AlertType.SEVERE_WEATHER: return "urgent" return self.priority def _get_tags(self, alert: SendableAlert) -> list[str]: """Get notification tags for an alert. Args: alert: The triggered or aggregated alert. Returns: List of emoji tags for ntfy. """ # Start with alert-specific tags tags = list(self.ALERT_TYPE_TAGS.get(alert.alert_type, self.default_tags)) return tags