Files
weather-alerts/app/services/alert_aggregator.py
2026-01-26 15:08:24 -06:00

284 lines
9.5 KiB
Python

"""Alert aggregator service for combining multiple alerts of the same type."""
from collections import defaultdict
from datetime import datetime
from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert
from app.utils.logging_config import get_logger
class AlertAggregator:
"""Aggregates multiple alerts of the same type into single notifications."""
# Alert types where lower values are worse (e.g., low temperature)
LOWER_IS_WORSE: set[AlertType] = {AlertType.TEMPERATURE_LOW}
def __init__(self) -> None:
"""Initialize the alert aggregator."""
self.logger = get_logger(__name__)
def aggregate(
self,
alerts: list[TriggeredAlert],
) -> list[AggregatedAlert]:
"""Aggregate alerts by type into summary notifications.
Severe weather alerts pass through unchanged.
Other alert types are grouped and combined into a single notification
per type with time range and extreme value information.
Args:
alerts: List of triggered alerts to aggregate.
Returns:
List of aggregated alerts (one per alert type).
"""
if not alerts:
return []
# Separate severe weather alerts from regular hourly alerts
severe_alerts: list[TriggeredAlert] = []
hourly_alerts: list[TriggeredAlert] = []
for alert in alerts:
if alert.alert_type == AlertType.SEVERE_WEATHER:
severe_alerts.append(alert)
else:
hourly_alerts.append(alert)
aggregated: list[AggregatedAlert] = []
# Convert severe weather alerts to AggregatedAlert (pass through)
aggregated.extend(self._convert_severe_alerts(severe_alerts))
# Aggregate hourly alerts by type
aggregated.extend(self._aggregate_by_type(hourly_alerts))
self.logger.info(
"alerts_aggregated",
input_count=len(alerts),
output_count=len(aggregated),
severe_count=len(severe_alerts),
)
return aggregated
def _convert_severe_alerts(
self,
alerts: list[TriggeredAlert],
) -> list[AggregatedAlert]:
"""Convert severe weather alerts to AggregatedAlert format.
Severe weather alerts are not aggregated - each one becomes
its own AggregatedAlert for individual notification.
Args:
alerts: List of severe weather triggered alerts.
Returns:
List of AggregatedAlert, one per severe weather alert.
"""
return [
AggregatedAlert(
alert_type=alert.alert_type,
title=alert.title,
message=alert.message,
triggered_hours=[alert.forecast_hour],
start_time=alert.forecast_hour,
end_time=alert.forecast_hour,
extreme_value=alert.value,
extreme_hour=alert.forecast_hour,
threshold=alert.threshold,
created_at=alert.created_at,
)
for alert in alerts
]
def _aggregate_by_type(
self,
alerts: list[TriggeredAlert],
) -> list[AggregatedAlert]:
"""Aggregate hourly alerts by alert type.
Args:
alerts: List of hourly triggered alerts.
Returns:
List of AggregatedAlert, one per alert type.
"""
# Group alerts by type
by_type: dict[AlertType, list[TriggeredAlert]] = defaultdict(list)
for alert in alerts:
by_type[alert.alert_type].append(alert)
aggregated: list[AggregatedAlert] = []
for alert_type, type_alerts in by_type.items():
aggregated_alert = self._aggregate_type_group(alert_type, type_alerts)
aggregated.append(aggregated_alert)
return aggregated
def _aggregate_type_group(
self,
alert_type: AlertType,
alerts: list[TriggeredAlert],
) -> AggregatedAlert:
"""Create a single AggregatedAlert from a group of same-type alerts.
Args:
alert_type: The type of all alerts in the group.
alerts: List of alerts of the same type.
Returns:
A single AggregatedAlert summarizing the group.
"""
# Sort by forecast hour for chronological ordering
sorted_alerts = sorted(alerts, key=lambda a: a.forecast_hour)
# Collect all triggered hours
triggered_hours = [a.forecast_hour for a in sorted_alerts]
start_time = sorted_alerts[0].forecast_hour
end_time = sorted_alerts[-1].forecast_hour
# Find extreme value (lowest for low temp, highest for others)
if alert_type in self.LOWER_IS_WORSE:
extreme_alert = min(sorted_alerts, key=lambda a: a.value)
else:
extreme_alert = max(sorted_alerts, key=lambda a: a.value)
extreme_value = extreme_alert.value
extreme_hour = extreme_alert.forecast_hour
threshold = sorted_alerts[0].threshold # Same for all alerts of same type
# Build summary message
message = self._build_summary_message(
alert_type=alert_type,
start_time=start_time,
end_time=end_time,
extreme_value=extreme_value,
extreme_hour=extreme_hour,
threshold=threshold,
hour_count=len(sorted_alerts),
)
# Build title
title = self._build_title(alert_type)
return AggregatedAlert(
alert_type=alert_type,
title=title,
message=message,
triggered_hours=triggered_hours,
start_time=start_time,
end_time=end_time,
extreme_value=extreme_value,
extreme_hour=extreme_hour,
threshold=threshold,
)
def _build_title(self, alert_type: AlertType) -> str:
"""Build a title for the aggregated alert.
Args:
alert_type: The type of alert.
Returns:
Title string.
"""
titles = {
AlertType.TEMPERATURE_LOW: "Low Temperature Alert",
AlertType.TEMPERATURE_HIGH: "High Temperature Alert",
AlertType.PRECIPITATION: "Precipitation Alert",
AlertType.WIND_SPEED: "High Wind Alert",
AlertType.WIND_GUST: "Wind Gust Alert",
}
return titles.get(alert_type, f"{alert_type.value} Alert")
def _build_summary_message(
self,
alert_type: AlertType,
start_time: str,
end_time: str,
extreme_value: float,
extreme_hour: str,
threshold: float,
hour_count: int,
) -> str:
"""Build a summary message for the aggregated alert.
Args:
alert_type: The type of alert.
start_time: First hour that triggered (YYYY-MM-DD-HH format).
end_time: Last hour that triggered (YYYY-MM-DD-HH format).
extreme_value: The most extreme value recorded.
extreme_hour: Hour when extreme value occurred.
threshold: The threshold that was exceeded.
hour_count: Number of hours that triggered.
Returns:
Human-readable summary message.
"""
# Format times for display
start_display = self._format_hour_display(start_time)
end_display = self._format_hour_display(end_time)
extreme_display = self._format_hour_display(extreme_hour)
# Build type-specific message
if alert_type == AlertType.TEMPERATURE_LOW:
return (
f"Low temps from {start_display} - {end_display}. "
f"Lowest: {extreme_value:.0f}°F at {extreme_display}. "
f"({hour_count} hours below {threshold:.0f}°F)"
)
elif alert_type == AlertType.TEMPERATURE_HIGH:
return (
f"High temps from {start_display} - {end_display}. "
f"Highest: {extreme_value:.0f}°F at {extreme_display}. "
f"({hour_count} hours above {threshold:.0f}°F)"
)
elif alert_type == AlertType.PRECIPITATION:
return (
f"Precipitation likely from {start_display} - {end_display}. "
f"Peak: {extreme_value:.0f}% at {extreme_display}. "
f"({hour_count} hours above {threshold:.0f}%)"
)
elif alert_type == AlertType.WIND_SPEED:
return (
f"High winds from {start_display} - {end_display}. "
f"Peak: {extreme_value:.0f} mph at {extreme_display}. "
f"({hour_count} hours above {threshold:.0f} mph)"
)
elif alert_type == AlertType.WIND_GUST:
return (
f"Wind gusts from {start_display} - {end_display}. "
f"Peak: {extreme_value:.0f} mph at {extreme_display}. "
f"({hour_count} hours above {threshold:.0f} mph)"
)
# Fallback for unknown types
return (
f"Alert from {start_display} - {end_display}. "
f"({hour_count} hours affected)"
)
def _format_hour_display(self, hour_key: str) -> str:
"""Format an hour key for human display.
Args:
hour_key: Hour key in YYYY-MM-DD-HH format.
Returns:
Human-readable time string (e.g., "3 PM" or "6 AM").
"""
try:
dt = datetime.strptime(hour_key, "%Y-%m-%d-%H")
return dt.strftime("%-I %p")
except ValueError:
return hour_key