init commit

This commit is contained in:
2026-01-26 15:08:24 -06:00
commit 67225a725a
33 changed files with 3350 additions and 0 deletions

17
app/models/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
"""Data models for weather alerts."""
from app.models.weather import HourlyForecast, WeatherForecast, WeatherAlert
from app.models.alerts import AggregatedAlert, AlertRules, AlertType, TriggeredAlert
from app.models.state import AlertState, SentAlertRecord
__all__ = [
"HourlyForecast",
"WeatherForecast",
"WeatherAlert",
"AggregatedAlert",
"AlertRules",
"AlertType",
"TriggeredAlert",
"AlertState",
"SentAlertRecord",
]

181
app/models/alerts.py Normal file
View File

@@ -0,0 +1,181 @@
"""Alert rule and triggered alert models."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Optional
class AlertType(Enum):
"""Types of weather alerts that can be triggered."""
TEMPERATURE_LOW = "temperature_low"
TEMPERATURE_HIGH = "temperature_high"
PRECIPITATION = "precipitation"
WIND_SPEED = "wind_speed"
WIND_GUST = "wind_gust"
SEVERE_WEATHER = "severe_weather"
@dataclass
class TemperatureRule:
"""Temperature alert rule configuration."""
enabled: bool = True
below: Optional[float] = 32
above: Optional[float] = 100
@dataclass
class PrecipitationRule:
"""Precipitation alert rule configuration."""
enabled: bool = True
probability_above: float = 70
@dataclass
class WindRule:
"""Wind alert rule configuration."""
enabled: bool = True
speed_above: float = 25
gust_above: float = 40
@dataclass
class SevereWeatherRule:
"""Severe weather alert rule configuration."""
enabled: bool = True
@dataclass
class AlertRules:
"""Collection of all alert rules."""
temperature: TemperatureRule = field(default_factory=TemperatureRule)
precipitation: PrecipitationRule = field(default_factory=PrecipitationRule)
wind: WindRule = field(default_factory=WindRule)
severe_weather: SevereWeatherRule = field(default_factory=SevereWeatherRule)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AlertRules":
"""Create AlertRules from a configuration dict.
Args:
data: The rules configuration dict.
Returns:
An AlertRules instance.
"""
temp_data = data.get("temperature", {})
precip_data = data.get("precipitation", {})
wind_data = data.get("wind", {})
severe_data = data.get("severe_weather", {})
return cls(
temperature=TemperatureRule(
enabled=temp_data.get("enabled", True),
below=temp_data.get("below", 32),
above=temp_data.get("above", 100),
),
precipitation=PrecipitationRule(
enabled=precip_data.get("enabled", True),
probability_above=precip_data.get("probability_above", 70),
),
wind=WindRule(
enabled=wind_data.get("enabled", True),
speed_above=wind_data.get("speed_above", 25),
gust_above=wind_data.get("gust_above", 40),
),
severe_weather=SevereWeatherRule(
enabled=severe_data.get("enabled", True),
),
)
@dataclass
class TriggeredAlert:
"""Represents an alert that was triggered by a rule evaluation."""
alert_type: AlertType
title: str
message: str
forecast_hour: str
value: float
threshold: float
created_at: datetime = field(default_factory=datetime.now)
@property
def dedup_key(self) -> str:
"""Generate a deduplication key for this alert.
Format: {alert_type}:{forecast_hour}
This allows re-alerting for different time periods while preventing
duplicate alerts for the same hour.
"""
return f"{self.alert_type.value}:{self.forecast_hour}"
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"alert_type": self.alert_type.value,
"title": self.title,
"message": self.message,
"forecast_hour": self.forecast_hour,
"value": self.value,
"threshold": self.threshold,
"created_at": self.created_at.isoformat(),
"dedup_key": self.dedup_key,
}
@dataclass
class AggregatedAlert:
"""Represents multiple alerts of the same type aggregated into one notification."""
alert_type: AlertType
title: str
message: str
triggered_hours: list[str]
start_time: str
end_time: str
extreme_value: float
extreme_hour: str
threshold: float
created_at: datetime = field(default_factory=datetime.now)
@property
def dedup_key(self) -> str:
"""Generate a deduplication key for this aggregated alert.
Format: {alert_type}:{date}
Day-level deduplication prevents re-sending aggregated alerts
for the same alert type on the same day.
"""
# Extract date from the first triggered hour (format: YYYY-MM-DD-HH)
date_part = self.start_time.rsplit("-", 1)[0] if self.start_time else ""
return f"{self.alert_type.value}:{date_part}"
@property
def hour_count(self) -> int:
"""Number of hours that triggered this alert."""
return len(self.triggered_hours)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"alert_type": self.alert_type.value,
"title": self.title,
"message": self.message,
"triggered_hours": self.triggered_hours,
"start_time": self.start_time,
"end_time": self.end_time,
"extreme_value": self.extreme_value,
"extreme_hour": self.extreme_hour,
"threshold": self.threshold,
"created_at": self.created_at.isoformat(),
"dedup_key": self.dedup_key,
"hour_count": self.hour_count,
}

133
app/models/state.py Normal file
View File

@@ -0,0 +1,133 @@
"""State management models for alert deduplication."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class SentAlertRecord:
"""Record of a sent alert for deduplication."""
dedup_key: str
alert_type: str
sent_at: datetime
forecast_hour: str
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"dedup_key": self.dedup_key,
"alert_type": self.alert_type,
"sent_at": self.sent_at.isoformat(),
"forecast_hour": self.forecast_hour,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "SentAlertRecord":
"""Create from dictionary.
Args:
data: The serialized record dict.
Returns:
A SentAlertRecord instance.
"""
return cls(
dedup_key=data["dedup_key"],
alert_type=data["alert_type"],
sent_at=datetime.fromisoformat(data["sent_at"]),
forecast_hour=data["forecast_hour"],
)
@dataclass
class AlertState:
"""State container for tracking sent alerts."""
sent_alerts: dict[str, SentAlertRecord] = field(default_factory=dict)
last_updated: datetime = field(default_factory=datetime.now)
def is_duplicate(self, dedup_key: str) -> bool:
"""Check if an alert with this dedup key has already been sent.
Args:
dedup_key: The deduplication key to check.
Returns:
True if this alert has already been sent.
"""
return dedup_key in self.sent_alerts
def record_sent(self, dedup_key: str, alert_type: str, forecast_hour: str) -> None:
"""Record that an alert was sent.
Args:
dedup_key: The deduplication key.
alert_type: The type of alert.
forecast_hour: The forecast hour this alert was for.
"""
self.sent_alerts[dedup_key] = SentAlertRecord(
dedup_key=dedup_key,
alert_type=alert_type,
sent_at=datetime.now(),
forecast_hour=forecast_hour,
)
self.last_updated = datetime.now()
def purge_old_records(self, window_hours: int) -> int:
"""Remove records older than the deduplication window.
Args:
window_hours: Number of hours to retain records.
Returns:
Number of records purged.
"""
cutoff = datetime.now()
original_count = len(self.sent_alerts)
self.sent_alerts = {
key: record
for key, record in self.sent_alerts.items()
if (cutoff - record.sent_at).total_seconds() < (window_hours * 3600)
}
purged = original_count - len(self.sent_alerts)
if purged > 0:
self.last_updated = datetime.now()
return purged
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"sent_alerts": {
key: record.to_dict() for key, record in self.sent_alerts.items()
},
"last_updated": self.last_updated.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AlertState":
"""Create from dictionary.
Args:
data: The serialized state dict.
Returns:
An AlertState instance.
"""
sent_alerts = {
key: SentAlertRecord.from_dict(record_data)
for key, record_data in data.get("sent_alerts", {}).items()
}
last_updated_str = data.get("last_updated")
last_updated = (
datetime.fromisoformat(last_updated_str)
if last_updated_str
else datetime.now()
)
return cls(sent_alerts=sent_alerts, last_updated=last_updated)

160
app/models/weather.py Normal file
View File

@@ -0,0 +1,160 @@
"""Weather data models for VisualCrossing API responses."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
@dataclass
class HourlyForecast:
"""Represents a single hour's weather forecast."""
datetime_str: str
datetime_epoch: int
temp: float
feelslike: float
humidity: float
precip: float
precip_prob: float
snow: float
snow_depth: float
wind_speed: float
wind_gust: float
wind_dir: float
pressure: float
visibility: float
cloud_cover: float
uv_index: float
conditions: str
icon: str
@property
def datetime(self) -> datetime:
"""Convert epoch to datetime."""
return datetime.fromtimestamp(self.datetime_epoch)
@property
def hour_key(self) -> str:
"""Get a key representing this forecast hour (YYYY-MM-DD-HH)."""
return self.datetime.strftime("%Y-%m-%d-%H")
@classmethod
def from_api_data(cls, data: dict[str, Any]) -> "HourlyForecast":
"""Create an HourlyForecast from VisualCrossing API data.
Args:
data: The hourly data dict from the API response.
Returns:
An HourlyForecast instance.
"""
return cls(
datetime_str=data.get("datetime", ""),
datetime_epoch=data.get("datetimeEpoch", 0),
temp=float(data.get("temp", 0)),
feelslike=float(data.get("feelslike", 0)),
humidity=float(data.get("humidity", 0)),
precip=float(data.get("precip") or 0),
precip_prob=float(data.get("precipprob") or 0),
snow=float(data.get("snow") or 0),
snow_depth=float(data.get("snowdepth") or 0),
wind_speed=float(data.get("windspeed") or 0),
wind_gust=float(data.get("windgust") or 0),
wind_dir=float(data.get("winddir") or 0),
pressure=float(data.get("pressure") or 0),
visibility=float(data.get("visibility") or 0),
cloud_cover=float(data.get("cloudcover") or 0),
uv_index=float(data.get("uvindex") or 0),
conditions=data.get("conditions", ""),
icon=data.get("icon", ""),
)
@dataclass
class WeatherAlert:
"""Represents a severe weather alert from the API."""
event: str
headline: str
description: str
onset: Optional[str]
ends: Optional[str]
id: str
language: str
link: str
@classmethod
def from_api_data(cls, data: dict[str, Any]) -> "WeatherAlert":
"""Create a WeatherAlert from VisualCrossing API data.
Args:
data: The alert data dict from the API response.
Returns:
A WeatherAlert instance.
"""
return cls(
event=data.get("event", "Unknown"),
headline=data.get("headline", ""),
description=data.get("description", ""),
onset=data.get("onset"),
ends=data.get("ends"),
id=data.get("id", ""),
language=data.get("language", "en"),
link=data.get("link", ""),
)
@dataclass
class WeatherForecast:
"""Complete weather forecast response."""
location: str
resolved_address: str
timezone: str
hourly_forecasts: list[HourlyForecast] = field(default_factory=list)
alerts: list[WeatherAlert] = field(default_factory=list)
@classmethod
def from_api_data(
cls,
data: dict[str, Any],
hours_ahead: int = 24,
) -> "WeatherForecast":
"""Create a WeatherForecast from VisualCrossing API data.
Args:
data: The full API response dict.
hours_ahead: Number of hours of forecast to include.
Returns:
A WeatherForecast instance.
"""
# Extract hourly forecasts from days
hourly_forecasts: list[HourlyForecast] = []
now = datetime.now()
for day in data.get("days", []):
for hour_data in day.get("hours", []):
forecast = HourlyForecast.from_api_data(hour_data)
# Only include future hours up to hours_ahead
if forecast.datetime > now:
hourly_forecasts.append(forecast)
if len(hourly_forecasts) >= hours_ahead:
break
if len(hourly_forecasts) >= hours_ahead:
break
# Extract alerts
alerts = [
WeatherAlert.from_api_data(alert_data)
for alert_data in data.get("alerts", [])
]
return cls(
location=data.get("address", ""),
resolved_address=data.get("resolvedAddress", ""),
timezone=data.get("timezone", ""),
hourly_forecasts=hourly_forecasts,
alerts=alerts,
)