"""State management models for alert deduplication and change detection.""" from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional @dataclass class SentAlertRecord: """Record of a sent alert for deduplication.""" dedup_key: str alert_type: str sent_at: datetime forecast_hour: str def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "dedup_key": self.dedup_key, "alert_type": self.alert_type, "sent_at": self.sent_at.isoformat(), "forecast_hour": self.forecast_hour, } @classmethod def from_dict(cls, data: dict[str, Any]) -> "SentAlertRecord": """Create from dictionary. Args: data: The serialized record dict. Returns: A SentAlertRecord instance. """ return cls( dedup_key=data["dedup_key"], alert_type=data["alert_type"], sent_at=datetime.fromisoformat(data["sent_at"]), forecast_hour=data["forecast_hour"], ) @dataclass class AlertSnapshot: """Snapshot of an alert for change detection between runs.""" alert_type: str extreme_value: float threshold: float start_time: str end_time: str hour_count: int captured_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "alert_type": self.alert_type, "extreme_value": self.extreme_value, "threshold": self.threshold, "start_time": self.start_time, "end_time": self.end_time, "hour_count": self.hour_count, "captured_at": self.captured_at.isoformat(), } @classmethod def from_dict(cls, data: dict[str, Any]) -> "AlertSnapshot": """Create from dictionary. Args: data: The serialized snapshot dict. Returns: An AlertSnapshot instance. """ return cls( alert_type=data["alert_type"], extreme_value=data["extreme_value"], threshold=data["threshold"], start_time=data["start_time"], end_time=data["end_time"], hour_count=data["hour_count"], captured_at=datetime.fromisoformat(data["captured_at"]), ) @dataclass class AlertState: """State container for tracking sent alerts and change detection.""" sent_alerts: dict[str, SentAlertRecord] = field(default_factory=dict) last_updated: datetime = field(default_factory=datetime.now) previous_alert_snapshots: dict[str, AlertSnapshot] = field(default_factory=dict) last_ai_summary_sent: Optional[datetime] = None def is_duplicate(self, dedup_key: str) -> bool: """Check if an alert with this dedup key has already been sent. Args: dedup_key: The deduplication key to check. Returns: True if this alert has already been sent. """ return dedup_key in self.sent_alerts def record_sent(self, dedup_key: str, alert_type: str, forecast_hour: str) -> None: """Record that an alert was sent. Args: dedup_key: The deduplication key. alert_type: The type of alert. forecast_hour: The forecast hour this alert was for. """ self.sent_alerts[dedup_key] = SentAlertRecord( dedup_key=dedup_key, alert_type=alert_type, sent_at=datetime.now(), forecast_hour=forecast_hour, ) self.last_updated = datetime.now() def purge_old_records(self, window_hours: int) -> int: """Remove records older than the deduplication window. Args: window_hours: Number of hours to retain records. Returns: Number of records purged. """ cutoff = datetime.now() original_count = len(self.sent_alerts) self.sent_alerts = { key: record for key, record in self.sent_alerts.items() if (cutoff - record.sent_at).total_seconds() < (window_hours * 3600) } purged = original_count - len(self.sent_alerts) if purged > 0: self.last_updated = datetime.now() return purged def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "sent_alerts": { key: record.to_dict() for key, record in self.sent_alerts.items() }, "last_updated": self.last_updated.isoformat(), "previous_alert_snapshots": { key: snapshot.to_dict() for key, snapshot in self.previous_alert_snapshots.items() }, "last_ai_summary_sent": ( self.last_ai_summary_sent.isoformat() if self.last_ai_summary_sent else None ), } @classmethod def from_dict(cls, data: dict[str, Any]) -> "AlertState": """Create from dictionary. Args: data: The serialized state dict. Returns: An AlertState instance. """ sent_alerts = { key: SentAlertRecord.from_dict(record_data) for key, record_data in data.get("sent_alerts", {}).items() } last_updated_str = data.get("last_updated") last_updated = ( datetime.fromisoformat(last_updated_str) if last_updated_str else datetime.now() ) previous_alert_snapshots = { key: AlertSnapshot.from_dict(snapshot_data) for key, snapshot_data in data.get("previous_alert_snapshots", {}).items() } last_ai_summary_str = data.get("last_ai_summary_sent") last_ai_summary_sent = ( datetime.fromisoformat(last_ai_summary_str) if last_ai_summary_str else None ) return cls( sent_alerts=sent_alerts, last_updated=last_updated, previous_alert_snapshots=previous_alert_snapshots, last_ai_summary_sent=last_ai_summary_sent, )