284 lines
9.5 KiB
Python
284 lines
9.5 KiB
Python
"""Alert aggregator service for combining multiple alerts of the same type."""
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert
|
|
from app.utils.logging_config import get_logger
|
|
|
|
|
|
class AlertAggregator:
|
|
"""Aggregates multiple alerts of the same type into single notifications."""
|
|
|
|
# Alert types where lower values are worse (e.g., low temperature)
|
|
LOWER_IS_WORSE: set[AlertType] = {AlertType.TEMPERATURE_LOW}
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the alert aggregator."""
|
|
self.logger = get_logger(__name__)
|
|
|
|
def aggregate(
|
|
self,
|
|
alerts: list[TriggeredAlert],
|
|
) -> list[AggregatedAlert]:
|
|
"""Aggregate alerts by type into summary notifications.
|
|
|
|
Severe weather alerts pass through unchanged.
|
|
Other alert types are grouped and combined into a single notification
|
|
per type with time range and extreme value information.
|
|
|
|
Args:
|
|
alerts: List of triggered alerts to aggregate.
|
|
|
|
Returns:
|
|
List of aggregated alerts (one per alert type).
|
|
"""
|
|
if not alerts:
|
|
return []
|
|
|
|
# Separate severe weather alerts from regular hourly alerts
|
|
severe_alerts: list[TriggeredAlert] = []
|
|
hourly_alerts: list[TriggeredAlert] = []
|
|
|
|
for alert in alerts:
|
|
if alert.alert_type == AlertType.SEVERE_WEATHER:
|
|
severe_alerts.append(alert)
|
|
else:
|
|
hourly_alerts.append(alert)
|
|
|
|
aggregated: list[AggregatedAlert] = []
|
|
|
|
# Convert severe weather alerts to AggregatedAlert (pass through)
|
|
aggregated.extend(self._convert_severe_alerts(severe_alerts))
|
|
|
|
# Aggregate hourly alerts by type
|
|
aggregated.extend(self._aggregate_by_type(hourly_alerts))
|
|
|
|
self.logger.info(
|
|
"alerts_aggregated",
|
|
input_count=len(alerts),
|
|
output_count=len(aggregated),
|
|
severe_count=len(severe_alerts),
|
|
)
|
|
|
|
return aggregated
|
|
|
|
def _convert_severe_alerts(
|
|
self,
|
|
alerts: list[TriggeredAlert],
|
|
) -> list[AggregatedAlert]:
|
|
"""Convert severe weather alerts to AggregatedAlert format.
|
|
|
|
Severe weather alerts are not aggregated - each one becomes
|
|
its own AggregatedAlert for individual notification.
|
|
|
|
Args:
|
|
alerts: List of severe weather triggered alerts.
|
|
|
|
Returns:
|
|
List of AggregatedAlert, one per severe weather alert.
|
|
"""
|
|
return [
|
|
AggregatedAlert(
|
|
alert_type=alert.alert_type,
|
|
title=alert.title,
|
|
message=alert.message,
|
|
triggered_hours=[alert.forecast_hour],
|
|
start_time=alert.forecast_hour,
|
|
end_time=alert.forecast_hour,
|
|
extreme_value=alert.value,
|
|
extreme_hour=alert.forecast_hour,
|
|
threshold=alert.threshold,
|
|
created_at=alert.created_at,
|
|
)
|
|
for alert in alerts
|
|
]
|
|
|
|
def _aggregate_by_type(
|
|
self,
|
|
alerts: list[TriggeredAlert],
|
|
) -> list[AggregatedAlert]:
|
|
"""Aggregate hourly alerts by alert type.
|
|
|
|
Args:
|
|
alerts: List of hourly triggered alerts.
|
|
|
|
Returns:
|
|
List of AggregatedAlert, one per alert type.
|
|
"""
|
|
# Group alerts by type
|
|
by_type: dict[AlertType, list[TriggeredAlert]] = defaultdict(list)
|
|
|
|
for alert in alerts:
|
|
by_type[alert.alert_type].append(alert)
|
|
|
|
aggregated: list[AggregatedAlert] = []
|
|
|
|
for alert_type, type_alerts in by_type.items():
|
|
aggregated_alert = self._aggregate_type_group(alert_type, type_alerts)
|
|
aggregated.append(aggregated_alert)
|
|
|
|
return aggregated
|
|
|
|
def _aggregate_type_group(
|
|
self,
|
|
alert_type: AlertType,
|
|
alerts: list[TriggeredAlert],
|
|
) -> AggregatedAlert:
|
|
"""Create a single AggregatedAlert from a group of same-type alerts.
|
|
|
|
Args:
|
|
alert_type: The type of all alerts in the group.
|
|
alerts: List of alerts of the same type.
|
|
|
|
Returns:
|
|
A single AggregatedAlert summarizing the group.
|
|
"""
|
|
# Sort by forecast hour for chronological ordering
|
|
sorted_alerts = sorted(alerts, key=lambda a: a.forecast_hour)
|
|
|
|
# Collect all triggered hours
|
|
triggered_hours = [a.forecast_hour for a in sorted_alerts]
|
|
start_time = sorted_alerts[0].forecast_hour
|
|
end_time = sorted_alerts[-1].forecast_hour
|
|
|
|
# Find extreme value (lowest for low temp, highest for others)
|
|
if alert_type in self.LOWER_IS_WORSE:
|
|
extreme_alert = min(sorted_alerts, key=lambda a: a.value)
|
|
else:
|
|
extreme_alert = max(sorted_alerts, key=lambda a: a.value)
|
|
|
|
extreme_value = extreme_alert.value
|
|
extreme_hour = extreme_alert.forecast_hour
|
|
threshold = sorted_alerts[0].threshold # Same for all alerts of same type
|
|
|
|
# Build summary message
|
|
message = self._build_summary_message(
|
|
alert_type=alert_type,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
extreme_value=extreme_value,
|
|
extreme_hour=extreme_hour,
|
|
threshold=threshold,
|
|
hour_count=len(sorted_alerts),
|
|
)
|
|
|
|
# Build title
|
|
title = self._build_title(alert_type)
|
|
|
|
return AggregatedAlert(
|
|
alert_type=alert_type,
|
|
title=title,
|
|
message=message,
|
|
triggered_hours=triggered_hours,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
extreme_value=extreme_value,
|
|
extreme_hour=extreme_hour,
|
|
threshold=threshold,
|
|
)
|
|
|
|
def _build_title(self, alert_type: AlertType) -> str:
|
|
"""Build a title for the aggregated alert.
|
|
|
|
Args:
|
|
alert_type: The type of alert.
|
|
|
|
Returns:
|
|
Title string.
|
|
"""
|
|
titles = {
|
|
AlertType.TEMPERATURE_LOW: "Low Temperature Alert",
|
|
AlertType.TEMPERATURE_HIGH: "High Temperature Alert",
|
|
AlertType.PRECIPITATION: "Precipitation Alert",
|
|
AlertType.WIND_SPEED: "High Wind Alert",
|
|
AlertType.WIND_GUST: "Wind Gust Alert",
|
|
}
|
|
return titles.get(alert_type, f"{alert_type.value} Alert")
|
|
|
|
def _build_summary_message(
|
|
self,
|
|
alert_type: AlertType,
|
|
start_time: str,
|
|
end_time: str,
|
|
extreme_value: float,
|
|
extreme_hour: str,
|
|
threshold: float,
|
|
hour_count: int,
|
|
) -> str:
|
|
"""Build a summary message for the aggregated alert.
|
|
|
|
Args:
|
|
alert_type: The type of alert.
|
|
start_time: First hour that triggered (YYYY-MM-DD-HH format).
|
|
end_time: Last hour that triggered (YYYY-MM-DD-HH format).
|
|
extreme_value: The most extreme value recorded.
|
|
extreme_hour: Hour when extreme value occurred.
|
|
threshold: The threshold that was exceeded.
|
|
hour_count: Number of hours that triggered.
|
|
|
|
Returns:
|
|
Human-readable summary message.
|
|
"""
|
|
# Format times for display
|
|
start_display = self._format_hour_display(start_time)
|
|
end_display = self._format_hour_display(end_time)
|
|
extreme_display = self._format_hour_display(extreme_hour)
|
|
|
|
# Build type-specific message
|
|
if alert_type == AlertType.TEMPERATURE_LOW:
|
|
return (
|
|
f"Low temps from {start_display} - {end_display}. "
|
|
f"Lowest: {extreme_value:.0f}°F at {extreme_display}. "
|
|
f"({hour_count} hours below {threshold:.0f}°F)"
|
|
)
|
|
|
|
elif alert_type == AlertType.TEMPERATURE_HIGH:
|
|
return (
|
|
f"High temps from {start_display} - {end_display}. "
|
|
f"Highest: {extreme_value:.0f}°F at {extreme_display}. "
|
|
f"({hour_count} hours above {threshold:.0f}°F)"
|
|
)
|
|
|
|
elif alert_type == AlertType.PRECIPITATION:
|
|
return (
|
|
f"Precipitation likely from {start_display} - {end_display}. "
|
|
f"Peak: {extreme_value:.0f}% at {extreme_display}. "
|
|
f"({hour_count} hours above {threshold:.0f}%)"
|
|
)
|
|
|
|
elif alert_type == AlertType.WIND_SPEED:
|
|
return (
|
|
f"High winds from {start_display} - {end_display}. "
|
|
f"Peak: {extreme_value:.0f} mph at {extreme_display}. "
|
|
f"({hour_count} hours above {threshold:.0f} mph)"
|
|
)
|
|
|
|
elif alert_type == AlertType.WIND_GUST:
|
|
return (
|
|
f"Wind gusts from {start_display} - {end_display}. "
|
|
f"Peak: {extreme_value:.0f} mph at {extreme_display}. "
|
|
f"({hour_count} hours above {threshold:.0f} mph)"
|
|
)
|
|
|
|
# Fallback for unknown types
|
|
return (
|
|
f"Alert from {start_display} - {end_display}. "
|
|
f"({hour_count} hours affected)"
|
|
)
|
|
|
|
def _format_hour_display(self, hour_key: str) -> str:
|
|
"""Format an hour key for human display.
|
|
|
|
Args:
|
|
hour_key: Hour key in YYYY-MM-DD-HH format.
|
|
|
|
Returns:
|
|
Human-readable time string (e.g., "3 PM" or "6 AM").
|
|
"""
|
|
try:
|
|
dt = datetime.strptime(hour_key, "%Y-%m-%d-%H")
|
|
return dt.strftime("%-I %p")
|
|
except ValueError:
|
|
return hour_key
|