234 lines
7.8 KiB
Python
234 lines
7.8 KiB
Python
"""Change detection service for comparing alerts between runs."""
|
|
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
from app.config.loader import ChangeThresholds
|
|
from app.models.alerts import AggregatedAlert
|
|
from app.models.state import AlertSnapshot
|
|
from app.utils.logging_config import get_logger
|
|
|
|
|
|
class ChangeType(Enum):
|
|
"""Types of changes that can be detected between runs."""
|
|
|
|
NEW = "new"
|
|
REMOVED = "removed"
|
|
VALUE_CHANGED = "value_changed"
|
|
|
|
|
|
@dataclass
|
|
class AlertChange:
|
|
"""Represents a detected change in an alert."""
|
|
|
|
alert_type: str
|
|
change_type: ChangeType
|
|
description: str
|
|
previous_value: Optional[float] = None
|
|
current_value: Optional[float] = None
|
|
value_delta: Optional[float] = None
|
|
|
|
|
|
@dataclass
|
|
class ChangeReport:
|
|
"""Report of all changes detected between runs."""
|
|
|
|
changes: list[AlertChange] = field(default_factory=list)
|
|
|
|
@property
|
|
def has_changes(self) -> bool:
|
|
"""Check if any changes were detected."""
|
|
return len(self.changes) > 0
|
|
|
|
@property
|
|
def new_alerts(self) -> list[AlertChange]:
|
|
"""Get list of new alert changes."""
|
|
return [c for c in self.changes if c.change_type == ChangeType.NEW]
|
|
|
|
@property
|
|
def removed_alerts(self) -> list[AlertChange]:
|
|
"""Get list of removed alert changes."""
|
|
return [c for c in self.changes if c.change_type == ChangeType.REMOVED]
|
|
|
|
@property
|
|
def value_changes(self) -> list[AlertChange]:
|
|
"""Get list of value change alerts."""
|
|
return [c for c in self.changes if c.change_type == ChangeType.VALUE_CHANGED]
|
|
|
|
def to_prompt_text(self) -> str:
|
|
"""Format change report for LLM prompt."""
|
|
if not self.has_changes:
|
|
return "No significant changes from previous alert."
|
|
|
|
lines = []
|
|
|
|
for change in self.new_alerts:
|
|
lines.append(f"NEW: {change.description}")
|
|
|
|
for change in self.removed_alerts:
|
|
lines.append(f"RESOLVED: {change.description}")
|
|
|
|
for change in self.value_changes:
|
|
lines.append(f"CHANGED: {change.description}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
class ChangeDetector:
|
|
"""Detects changes between current alerts and previous snapshots."""
|
|
|
|
# Map alert types to their value thresholds
|
|
ALERT_TYPE_TO_THRESHOLD_KEY = {
|
|
"temperature_low": "temperature",
|
|
"temperature_high": "temperature",
|
|
"wind_speed": "wind_speed",
|
|
"wind_gust": "wind_gust",
|
|
"precipitation": "precipitation_prob",
|
|
}
|
|
|
|
def __init__(self, thresholds: ChangeThresholds) -> None:
|
|
"""Initialize the change detector.
|
|
|
|
Args:
|
|
thresholds: Thresholds for detecting significant changes.
|
|
"""
|
|
self.thresholds = thresholds
|
|
self.logger = get_logger(__name__)
|
|
|
|
def detect(
|
|
self,
|
|
current_alerts: list[AggregatedAlert],
|
|
previous_snapshots: dict[str, AlertSnapshot],
|
|
) -> ChangeReport:
|
|
"""Detect changes between current alerts and previous snapshots.
|
|
|
|
Args:
|
|
current_alerts: List of current aggregated alerts.
|
|
previous_snapshots: Dict of previous alert snapshots keyed by alert type.
|
|
|
|
Returns:
|
|
ChangeReport containing all detected changes.
|
|
"""
|
|
changes: list[AlertChange] = []
|
|
current_types = {alert.alert_type.value for alert in current_alerts}
|
|
previous_types = set(previous_snapshots.keys())
|
|
|
|
# Detect new alerts
|
|
for alert in current_alerts:
|
|
alert_type = alert.alert_type.value
|
|
if alert_type not in previous_types:
|
|
changes.append(
|
|
AlertChange(
|
|
alert_type=alert_type,
|
|
change_type=ChangeType.NEW,
|
|
description=self._format_new_alert_description(alert),
|
|
current_value=alert.extreme_value,
|
|
)
|
|
)
|
|
self.logger.debug(
|
|
"change_detected_new",
|
|
alert_type=alert_type,
|
|
)
|
|
else:
|
|
# Check for significant value changes
|
|
prev_snapshot = previous_snapshots[alert_type]
|
|
value_change = self._detect_value_change(alert, prev_snapshot)
|
|
if value_change:
|
|
changes.append(value_change)
|
|
self.logger.debug(
|
|
"change_detected_value",
|
|
alert_type=alert_type,
|
|
previous=prev_snapshot.extreme_value,
|
|
current=alert.extreme_value,
|
|
)
|
|
|
|
# Detect removed alerts
|
|
for alert_type in previous_types - current_types:
|
|
prev_snapshot = previous_snapshots[alert_type]
|
|
changes.append(
|
|
AlertChange(
|
|
alert_type=alert_type,
|
|
change_type=ChangeType.REMOVED,
|
|
description=self._format_removed_alert_description(prev_snapshot),
|
|
previous_value=prev_snapshot.extreme_value,
|
|
)
|
|
)
|
|
self.logger.debug(
|
|
"change_detected_removed",
|
|
alert_type=alert_type,
|
|
)
|
|
|
|
report = ChangeReport(changes=changes)
|
|
|
|
self.logger.info(
|
|
"change_detection_complete",
|
|
total_changes=len(changes),
|
|
new_count=len(report.new_alerts),
|
|
removed_count=len(report.removed_alerts),
|
|
value_changes_count=len(report.value_changes),
|
|
)
|
|
|
|
return report
|
|
|
|
def _detect_value_change(
|
|
self,
|
|
current: AggregatedAlert,
|
|
previous: AlertSnapshot,
|
|
) -> Optional[AlertChange]:
|
|
"""Detect if there's a significant value change between current and previous.
|
|
|
|
Args:
|
|
current: Current aggregated alert.
|
|
previous: Previous alert snapshot.
|
|
|
|
Returns:
|
|
AlertChange if significant change detected, None otherwise.
|
|
"""
|
|
threshold_key = self.ALERT_TYPE_TO_THRESHOLD_KEY.get(current.alert_type.value)
|
|
if not threshold_key:
|
|
return None
|
|
|
|
threshold = getattr(self.thresholds, threshold_key, None)
|
|
if threshold is None:
|
|
return None
|
|
|
|
delta = abs(current.extreme_value - previous.extreme_value)
|
|
if delta >= threshold:
|
|
return AlertChange(
|
|
alert_type=current.alert_type.value,
|
|
change_type=ChangeType.VALUE_CHANGED,
|
|
description=self._format_value_change_description(
|
|
current, previous, delta
|
|
),
|
|
previous_value=previous.extreme_value,
|
|
current_value=current.extreme_value,
|
|
value_delta=delta,
|
|
)
|
|
|
|
return None
|
|
|
|
def _format_new_alert_description(self, alert: AggregatedAlert) -> str:
|
|
"""Format description for a new alert."""
|
|
alert_type = alert.alert_type.value.replace("_", " ").title()
|
|
return f"{alert_type} alert: {alert.extreme_value:.0f} at {alert.extreme_hour}"
|
|
|
|
def _format_removed_alert_description(self, snapshot: AlertSnapshot) -> str:
|
|
"""Format description for a removed alert."""
|
|
alert_type = snapshot.alert_type.replace("_", " ").title()
|
|
return f"{alert_type} alert no longer active"
|
|
|
|
def _format_value_change_description(
|
|
self,
|
|
current: AggregatedAlert,
|
|
previous: AlertSnapshot,
|
|
delta: float,
|
|
) -> str:
|
|
"""Format description for a value change."""
|
|
alert_type = current.alert_type.value.replace("_", " ").title()
|
|
direction = "increased" if current.extreme_value > previous.extreme_value else "decreased"
|
|
return (
|
|
f"{alert_type} {direction} from {previous.extreme_value:.0f} "
|
|
f"to {current.extreme_value:.0f}"
|
|
)
|