Files
weather-alerts/app/models/state.py

207 lines
6.2 KiB
Python

"""State management models for alert deduplication and change detection."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
@dataclass
class SentAlertRecord:
"""Record of a sent alert for deduplication."""
dedup_key: str
alert_type: str
sent_at: datetime
forecast_hour: str
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"dedup_key": self.dedup_key,
"alert_type": self.alert_type,
"sent_at": self.sent_at.isoformat(),
"forecast_hour": self.forecast_hour,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "SentAlertRecord":
"""Create from dictionary.
Args:
data: The serialized record dict.
Returns:
A SentAlertRecord instance.
"""
return cls(
dedup_key=data["dedup_key"],
alert_type=data["alert_type"],
sent_at=datetime.fromisoformat(data["sent_at"]),
forecast_hour=data["forecast_hour"],
)
@dataclass
class AlertSnapshot:
"""Snapshot of an alert for change detection between runs."""
alert_type: str
extreme_value: float
threshold: float
start_time: str
end_time: str
hour_count: int
captured_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"alert_type": self.alert_type,
"extreme_value": self.extreme_value,
"threshold": self.threshold,
"start_time": self.start_time,
"end_time": self.end_time,
"hour_count": self.hour_count,
"captured_at": self.captured_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AlertSnapshot":
"""Create from dictionary.
Args:
data: The serialized snapshot dict.
Returns:
An AlertSnapshot instance.
"""
return cls(
alert_type=data["alert_type"],
extreme_value=data["extreme_value"],
threshold=data["threshold"],
start_time=data["start_time"],
end_time=data["end_time"],
hour_count=data["hour_count"],
captured_at=datetime.fromisoformat(data["captured_at"]),
)
@dataclass
class AlertState:
"""State container for tracking sent alerts and change detection."""
sent_alerts: dict[str, SentAlertRecord] = field(default_factory=dict)
last_updated: datetime = field(default_factory=datetime.now)
previous_alert_snapshots: dict[str, AlertSnapshot] = field(default_factory=dict)
last_ai_summary_sent: Optional[datetime] = None
def is_duplicate(self, dedup_key: str) -> bool:
"""Check if an alert with this dedup key has already been sent.
Args:
dedup_key: The deduplication key to check.
Returns:
True if this alert has already been sent.
"""
return dedup_key in self.sent_alerts
def record_sent(self, dedup_key: str, alert_type: str, forecast_hour: str) -> None:
"""Record that an alert was sent.
Args:
dedup_key: The deduplication key.
alert_type: The type of alert.
forecast_hour: The forecast hour this alert was for.
"""
self.sent_alerts[dedup_key] = SentAlertRecord(
dedup_key=dedup_key,
alert_type=alert_type,
sent_at=datetime.now(),
forecast_hour=forecast_hour,
)
self.last_updated = datetime.now()
def purge_old_records(self, window_hours: int) -> int:
"""Remove records older than the deduplication window.
Args:
window_hours: Number of hours to retain records.
Returns:
Number of records purged.
"""
cutoff = datetime.now()
original_count = len(self.sent_alerts)
self.sent_alerts = {
key: record
for key, record in self.sent_alerts.items()
if (cutoff - record.sent_at).total_seconds() < (window_hours * 3600)
}
purged = original_count - len(self.sent_alerts)
if purged > 0:
self.last_updated = datetime.now()
return purged
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"sent_alerts": {
key: record.to_dict() for key, record in self.sent_alerts.items()
},
"last_updated": self.last_updated.isoformat(),
"previous_alert_snapshots": {
key: snapshot.to_dict()
for key, snapshot in self.previous_alert_snapshots.items()
},
"last_ai_summary_sent": (
self.last_ai_summary_sent.isoformat()
if self.last_ai_summary_sent
else None
),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AlertState":
"""Create from dictionary.
Args:
data: The serialized state dict.
Returns:
An AlertState instance.
"""
sent_alerts = {
key: SentAlertRecord.from_dict(record_data)
for key, record_data in data.get("sent_alerts", {}).items()
}
last_updated_str = data.get("last_updated")
last_updated = (
datetime.fromisoformat(last_updated_str)
if last_updated_str
else datetime.now()
)
previous_alert_snapshots = {
key: AlertSnapshot.from_dict(snapshot_data)
for key, snapshot_data in data.get("previous_alert_snapshots", {}).items()
}
last_ai_summary_str = data.get("last_ai_summary_sent")
last_ai_summary_sent = (
datetime.fromisoformat(last_ai_summary_str)
if last_ai_summary_str
else None
)
return cls(
sent_alerts=sent_alerts,
last_updated=last_updated,
previous_alert_snapshots=previous_alert_snapshots,
last_ai_summary_sent=last_ai_summary_sent,
)