"""Alert aggregator service for combining multiple alerts of the same type.""" from collections import defaultdict from datetime import datetime from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert from app.utils.logging_config import get_logger class AlertAggregator: """Aggregates multiple alerts of the same type into single notifications.""" # Alert types where lower values are worse (e.g., low temperature) LOWER_IS_WORSE: set[AlertType] = {AlertType.TEMPERATURE_LOW} def __init__(self) -> None: """Initialize the alert aggregator.""" self.logger = get_logger(__name__) def aggregate( self, alerts: list[TriggeredAlert], ) -> list[AggregatedAlert]: """Aggregate alerts by type into summary notifications. Severe weather alerts pass through unchanged. Other alert types are grouped and combined into a single notification per type with time range and extreme value information. Args: alerts: List of triggered alerts to aggregate. Returns: List of aggregated alerts (one per alert type). """ if not alerts: return [] # Separate severe weather alerts from regular hourly alerts severe_alerts: list[TriggeredAlert] = [] hourly_alerts: list[TriggeredAlert] = [] for alert in alerts: if alert.alert_type == AlertType.SEVERE_WEATHER: severe_alerts.append(alert) else: hourly_alerts.append(alert) aggregated: list[AggregatedAlert] = [] # Convert severe weather alerts to AggregatedAlert (pass through) aggregated.extend(self._convert_severe_alerts(severe_alerts)) # Aggregate hourly alerts by type aggregated.extend(self._aggregate_by_type(hourly_alerts)) self.logger.info( "alerts_aggregated", input_count=len(alerts), output_count=len(aggregated), severe_count=len(severe_alerts), ) return aggregated def _convert_severe_alerts( self, alerts: list[TriggeredAlert], ) -> list[AggregatedAlert]: """Convert severe weather alerts to AggregatedAlert format. Severe weather alerts are not aggregated - each one becomes its own AggregatedAlert for individual notification. Args: alerts: List of severe weather triggered alerts. Returns: List of AggregatedAlert, one per severe weather alert. """ return [ AggregatedAlert( alert_type=alert.alert_type, title=alert.title, message=alert.message, triggered_hours=[alert.forecast_hour], start_time=alert.forecast_hour, end_time=alert.forecast_hour, extreme_value=alert.value, extreme_hour=alert.forecast_hour, threshold=alert.threshold, created_at=alert.created_at, ) for alert in alerts ] def _aggregate_by_type( self, alerts: list[TriggeredAlert], ) -> list[AggregatedAlert]: """Aggregate hourly alerts by alert type. Args: alerts: List of hourly triggered alerts. Returns: List of AggregatedAlert, one per alert type. """ # Group alerts by type by_type: dict[AlertType, list[TriggeredAlert]] = defaultdict(list) for alert in alerts: by_type[alert.alert_type].append(alert) aggregated: list[AggregatedAlert] = [] for alert_type, type_alerts in by_type.items(): aggregated_alert = self._aggregate_type_group(alert_type, type_alerts) aggregated.append(aggregated_alert) return aggregated def _aggregate_type_group( self, alert_type: AlertType, alerts: list[TriggeredAlert], ) -> AggregatedAlert: """Create a single AggregatedAlert from a group of same-type alerts. Args: alert_type: The type of all alerts in the group. alerts: List of alerts of the same type. Returns: A single AggregatedAlert summarizing the group. """ # Sort by forecast hour for chronological ordering sorted_alerts = sorted(alerts, key=lambda a: a.forecast_hour) # Collect all triggered hours triggered_hours = [a.forecast_hour for a in sorted_alerts] start_time = sorted_alerts[0].forecast_hour end_time = sorted_alerts[-1].forecast_hour # Find extreme value (lowest for low temp, highest for others) if alert_type in self.LOWER_IS_WORSE: extreme_alert = min(sorted_alerts, key=lambda a: a.value) else: extreme_alert = max(sorted_alerts, key=lambda a: a.value) extreme_value = extreme_alert.value extreme_hour = extreme_alert.forecast_hour threshold = sorted_alerts[0].threshold # Same for all alerts of same type # Build summary message message = self._build_summary_message( alert_type=alert_type, start_time=start_time, end_time=end_time, extreme_value=extreme_value, extreme_hour=extreme_hour, threshold=threshold, hour_count=len(sorted_alerts), ) # Build title title = self._build_title(alert_type) return AggregatedAlert( alert_type=alert_type, title=title, message=message, triggered_hours=triggered_hours, start_time=start_time, end_time=end_time, extreme_value=extreme_value, extreme_hour=extreme_hour, threshold=threshold, ) def _build_title(self, alert_type: AlertType) -> str: """Build a title for the aggregated alert. Args: alert_type: The type of alert. Returns: Title string. """ titles = { AlertType.TEMPERATURE_LOW: "Low Temperature Alert", AlertType.TEMPERATURE_HIGH: "High Temperature Alert", AlertType.PRECIPITATION: "Precipitation Alert", AlertType.WIND_SPEED: "High Wind Alert", AlertType.WIND_GUST: "Wind Gust Alert", } return titles.get(alert_type, f"{alert_type.value} Alert") def _build_summary_message( self, alert_type: AlertType, start_time: str, end_time: str, extreme_value: float, extreme_hour: str, threshold: float, hour_count: int, ) -> str: """Build a summary message for the aggregated alert. Args: alert_type: The type of alert. start_time: First hour that triggered (YYYY-MM-DD-HH format). end_time: Last hour that triggered (YYYY-MM-DD-HH format). extreme_value: The most extreme value recorded. extreme_hour: Hour when extreme value occurred. threshold: The threshold that was exceeded. hour_count: Number of hours that triggered. Returns: Human-readable summary message. """ # Format times for display start_display = self._format_hour_display(start_time) end_display = self._format_hour_display(end_time) extreme_display = self._format_hour_display(extreme_hour) # Build type-specific message if alert_type == AlertType.TEMPERATURE_LOW: return ( f"Low temps from {start_display} - {end_display}. " f"Lowest: {extreme_value:.0f}°F at {extreme_display}. " f"({hour_count} hours below {threshold:.0f}°F)" ) elif alert_type == AlertType.TEMPERATURE_HIGH: return ( f"High temps from {start_display} - {end_display}. " f"Highest: {extreme_value:.0f}°F at {extreme_display}. " f"({hour_count} hours above {threshold:.0f}°F)" ) elif alert_type == AlertType.PRECIPITATION: return ( f"Precipitation likely from {start_display} - {end_display}. " f"Peak: {extreme_value:.0f}% at {extreme_display}. " f"({hour_count} hours above {threshold:.0f}%)" ) elif alert_type == AlertType.WIND_SPEED: return ( f"High winds from {start_display} - {end_display}. " f"Peak: {extreme_value:.0f} mph at {extreme_display}. " f"({hour_count} hours above {threshold:.0f} mph)" ) elif alert_type == AlertType.WIND_GUST: return ( f"Wind gusts from {start_display} - {end_display}. " f"Peak: {extreme_value:.0f} mph at {extreme_display}. " f"({hour_count} hours above {threshold:.0f} mph)" ) # Fallback for unknown types return ( f"Alert from {start_display} - {end_display}. " f"({hour_count} hours affected)" ) def _format_hour_display(self, hour_key: str) -> str: """Format an hour key for human display. Args: hour_key: Hour key in YYYY-MM-DD-HH format. Returns: Human-readable time string (e.g., "3 PM" or "6 AM"). """ try: dt = datetime.strptime(hour_key, "%Y-%m-%d-%H") return dt.strftime("%-I %p") except ValueError: return hour_key