From 67225a725a2e064f67e16a7dad377412339449cf Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Mon, 26 Jan 2026 15:08:24 -0600 Subject: [PATCH] init commit --- .env.example | 11 + .gitea/workflows/weather-alerts.yml | 36 +++ .gitignore | 65 ++++++ CLAUDE.md | 67 ++++++ app/__init__.py | 3 + app/config/__init__.py | 5 + app/config/loader.py | 159 +++++++++++++ app/config/settings.example.yaml | 40 ++++ app/config/settings.yaml | 36 +++ app/main.py | 185 ++++++++++++++++ app/models/__init__.py | 17 ++ app/models/alerts.py | 181 +++++++++++++++ app/models/state.py | 133 +++++++++++ app/models/weather.py | 160 ++++++++++++++ app/services/__init__.py | 13 ++ app/services/alert_aggregator.py | 283 ++++++++++++++++++++++++ app/services/notification_service.py | 177 +++++++++++++++ app/services/rule_engine.py | 231 +++++++++++++++++++ app/services/state_manager.py | 185 ++++++++++++++++ app/services/weather_service.py | 101 +++++++++ app/utils/__init__.py | 6 + app/utils/http_client.py | 162 ++++++++++++++ app/utils/logging_config.py | 54 +++++ docs/CLAUDE.md | 20 ++ docs/code_guidelines.md | 120 ++++++++++ docs/implementation_roadmap.md | 164 ++++++++++++++ docs/security.md | 46 ++++ requirements.txt | 6 + run.py | 14 ++ tests/__init__.py | 1 + tests/test_rule_engine.py | 319 +++++++++++++++++++++++++++ tests/test_state_manager.py | 175 +++++++++++++++ tests/test_weather_service.py | 175 +++++++++++++++ 33 files changed, 3350 insertions(+) create mode 100644 .env.example create mode 100644 .gitea/workflows/weather-alerts.yml create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 app/__init__.py create mode 100644 app/config/__init__.py create mode 100644 app/config/loader.py create mode 100644 app/config/settings.example.yaml create mode 100644 app/config/settings.yaml create mode 100644 app/main.py create mode 100644 app/models/__init__.py create mode 100644 app/models/alerts.py create mode 100644 app/models/state.py create mode 100644 app/models/weather.py create mode 100644 app/services/__init__.py create mode 100644 app/services/alert_aggregator.py create mode 100644 app/services/notification_service.py create mode 100644 app/services/rule_engine.py create mode 100644 app/services/state_manager.py create mode 100644 app/services/weather_service.py create mode 100644 app/utils/__init__.py create mode 100644 app/utils/http_client.py create mode 100644 app/utils/logging_config.py create mode 100644 docs/CLAUDE.md create mode 100644 docs/code_guidelines.md create mode 100644 docs/implementation_roadmap.md create mode 100644 docs/security.md create mode 100644 requirements.txt create mode 100755 run.py create mode 100644 tests/__init__.py create mode 100644 tests/test_rule_engine.py create mode 100644 tests/test_state_manager.py create mode 100644 tests/test_weather_service.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..400fe67 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# Weather Alerts Environment Variables +# Copy this file to .env and fill in your actual values. +# NEVER commit .env to version control! + +# VisualCrossing Weather API Key +# Get your free key at: https://www.visualcrossing.com/ +VISUALCROSSING_API_KEY=your_api_key_here + +# Ntfy Access Token +# Required if your ntfy server requires authentication +NTFY_ACCESS_TOKEN=your_ntfy_token_here diff --git a/.gitea/workflows/weather-alerts.yml b/.gitea/workflows/weather-alerts.yml new file mode 100644 index 0000000..f968a00 --- /dev/null +++ b/.gitea/workflows/weather-alerts.yml @@ -0,0 +1,36 @@ +name: Weather Alerts + +on: + schedule: + - cron: '0 * * * *' # Every hour at :00 + workflow_dispatch: {} # Manual trigger + +jobs: + check-weather: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + cache: 'pip' + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Run weather alerts + env: + VISUALCROSSING_API_KEY: ${{ secrets.VISUALCROSSING_API_KEY }} + NTFY_ACCESS_TOKEN: ${{ secrets.NTFY_ACCESS_TOKEN }} + run: python -m app.main + + - name: Commit state changes + run: | + git config user.name "Gitea Actions" + git config user.email "actions@gitea.local" + git add data/state.json + git diff --staged --quiet || git commit -m "chore: update weather alert state [skip ci]" + git push diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1092b50 --- /dev/null +++ b/.gitignore @@ -0,0 +1,65 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ +.venv/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Environment and secrets +.env +*.env +.env.local +.env.*.local + +# State and data files +data/ +*.json +!settings.example.yaml + +# Logs +*.log +logs/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# Type checking +.mypy_cache/ +.dmypy.json +dmypy.json + +# OS +.DS_Store +Thumbs.db diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..24f1f04 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,67 @@ +# CLAUDE.md + +## Project Overview +Purpose: This project is weather alerting via notify using the weathervisualcrossing API... +Technologies: Python +Description: This project will be built with python, is a single purpose script used to query weather data from weathervisualcrossing.com and send notifications to my ntfy server in the event the weather forcast mets alerting conditions set in the application. + +## High level Flow +Script hits the API for the weather forecast for the next few hours. +Compares the forecast data against the weather notification rules +If there is an alerting condition from the rules, we send a notification to my ntfy server. + +## VisualCrossing API details: +Weather forecast can be obtained from this API using this structure. +https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/viola%2Ctn? + unitGroup=us + &include=days,hours,alerts,current,events + &key= + &contentType=json + +## Notifications: +Notifications will be sent to ntfy.sneakygeek.net +This server requires auth to post messages +The topic will be "weather-alerts" + +## Development Guidelines + +### Project Structure: +I prefer to have my source in app, not src. +I prefer **centralizing common logic** such as HTTP wrappers, logging handlers, validators, etc. +I like keeping things **modular**, with: + * `/app/services/...` + * `/app/utils/...` + * `/app/models/...` + * Typed config loaders + * YAML-driven config structures + + +### Coding standards: +I often follow a pattern **brainstorm → design → code → revise** and like that flow. +Reference `docs/code_guidelines.md` for code standards and rules. + +### Security Standards: +Reference `docs/security.md` for security guidance and standards + +--- +## ⚠️ CLAUDE WORKSPACE BELOW ⚠️ + +**The sections above define my development preferences and standards.** +**Everything below this line is working context for Claude to track project-specific information, decisions, and progress.** + +## Claude Context & Project Notes + +### Project Context + + +### Implementation Notes + + +### Session History + + +### Active Tasks & Notes + + +### Change Log + \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..30bdc4e --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,3 @@ +"""Weather Alerts application package.""" + +__version__ = "1.0.0" diff --git a/app/config/__init__.py b/app/config/__init__.py new file mode 100644 index 0000000..e32c2a9 --- /dev/null +++ b/app/config/__init__.py @@ -0,0 +1,5 @@ +"""Configuration loading and management.""" + +from app.config.loader import load_config, AppConfig + +__all__ = ["load_config", "AppConfig"] diff --git a/app/config/loader.py b/app/config/loader.py new file mode 100644 index 0000000..a39a631 --- /dev/null +++ b/app/config/loader.py @@ -0,0 +1,159 @@ +"""Typed configuration loader with YAML and environment variable support.""" + +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional + +import yaml +from dotenv import load_dotenv + +from app.models.alerts import AlertRules + + +@dataclass +class AppSettings: + """Application-level settings.""" + + name: str = "weather-alerts" + version: str = "1.0.0" + log_level: str = "INFO" + + +@dataclass +class WeatherSettings: + """Weather API settings.""" + + location: str = "viola,tn" + hours_ahead: int = 24 + unit_group: str = "us" + api_key: str = "" + + +@dataclass +class NtfySettings: + """Ntfy notification settings.""" + + server_url: str = "https://ntfy.sneakygeek.net" + topic: str = "weather-alerts" + priority: str = "high" + tags: list[str] = field(default_factory=lambda: ["cloud", "warning"]) + access_token: str = "" + + +@dataclass +class NotificationSettings: + """Notification settings container.""" + + ntfy: NtfySettings = field(default_factory=NtfySettings) + + +@dataclass +class StateSettings: + """State management settings.""" + + file_path: str = "./data/state.json" + dedup_window_hours: int = 24 + + +@dataclass +class AlertSettings: + """Alert configuration settings.""" + + rules: AlertRules = field(default_factory=AlertRules) + + +@dataclass +class AppConfig: + """Complete application configuration.""" + + app: AppSettings = field(default_factory=AppSettings) + weather: WeatherSettings = field(default_factory=WeatherSettings) + alerts: AlertSettings = field(default_factory=AlertSettings) + notifications: NotificationSettings = field(default_factory=NotificationSettings) + state: StateSettings = field(default_factory=StateSettings) + + +def load_config( + config_path: Optional[str] = None, + env_path: Optional[str] = None, +) -> AppConfig: + """Load configuration from YAML file and environment variables. + + Args: + config_path: Path to the YAML config file. Defaults to app/config/settings.yaml. + env_path: Path to the .env file. Defaults to .env in the project root. + + Returns: + A fully populated AppConfig instance. + """ + # Load environment variables from .env file + if env_path: + load_dotenv(env_path) + else: + load_dotenv() + + # Determine config file path + if config_path is None: + config_path = os.environ.get( + "WEATHER_ALERTS_CONFIG", + str(Path(__file__).parent / "settings.yaml"), + ) + + # Load YAML config + config_data: dict[str, Any] = {} + config_file = Path(config_path) + if config_file.exists(): + with open(config_file) as f: + config_data = yaml.safe_load(f) or {} + + # Build configuration with defaults + app_data = config_data.get("app", {}) + weather_data = config_data.get("weather", {}) + alerts_data = config_data.get("alerts", {}) + notifications_data = config_data.get("notifications", {}) + state_data = config_data.get("state", {}) + + # Build app settings + app_settings = AppSettings( + name=app_data.get("name", "weather-alerts"), + version=app_data.get("version", "1.0.0"), + log_level=app_data.get("log_level", "INFO"), + ) + + # Build weather settings with API key from environment + weather_settings = WeatherSettings( + location=weather_data.get("location", "viola,tn"), + hours_ahead=weather_data.get("hours_ahead", 24), + unit_group=weather_data.get("unit_group", "us"), + api_key=os.environ.get("VISUALCROSSING_API_KEY", ""), + ) + + # Build alert settings + rules_data = alerts_data.get("rules", {}) + alert_settings = AlertSettings(rules=AlertRules.from_dict(rules_data)) + + # Build notification settings with token from environment + ntfy_data = notifications_data.get("ntfy", {}) + ntfy_settings = NtfySettings( + server_url=ntfy_data.get("server_url", "https://ntfy.sneakygeek.net"), + topic=ntfy_data.get("topic", "weather-alerts"), + priority=ntfy_data.get("priority", "high"), + tags=ntfy_data.get("tags", ["cloud", "warning"]), + access_token=os.environ.get("NTFY_ACCESS_TOKEN", ""), + ) + notification_settings = NotificationSettings(ntfy=ntfy_settings) + + # Build state settings + state_settings = StateSettings( + file_path=state_data.get("file_path", "./data/state.json"), + dedup_window_hours=state_data.get("dedup_window_hours", 24), + ) + + return AppConfig( + app=app_settings, + weather=weather_settings, + alerts=alert_settings, + notifications=notification_settings, + state=state_settings, + ) diff --git a/app/config/settings.example.yaml b/app/config/settings.example.yaml new file mode 100644 index 0000000..0beb552 --- /dev/null +++ b/app/config/settings.example.yaml @@ -0,0 +1,40 @@ +# Weather Alerts Configuration Example +# Copy this file to settings.yaml and customize as needed. +# Secrets (API keys, tokens) should be in .env file, not here. + +app: + name: "weather-alerts" + version: "1.0.0" + log_level: "INFO" # DEBUG, INFO, WARNING, ERROR + +weather: + location: "viola,tn" # City,State or ZIP code + hours_ahead: 24 # Number of forecast hours to check + unit_group: "us" # "us" for Fahrenheit/mph, "metric" for Celsius/kph + +alerts: + rules: + temperature: + enabled: true + below: 32 # Alert when temp falls below this (freezing) + above: 100 # Alert when temp exceeds this (extreme heat) + precipitation: + enabled: true + probability_above: 70 # Alert when precipitation chance exceeds this % + wind: + enabled: true + speed_above: 25 # Alert when sustained wind exceeds this (mph) + gust_above: 40 # Alert when wind gusts exceed this (mph) + severe_weather: + enabled: true # Forward severe weather alerts from the API + +notifications: + ntfy: + server_url: "https://ntfy.sneakygeek.net" + topic: "weather-alerts" + priority: "high" # min, low, default, high, urgent + tags: ["cloud", "warning"] # Emoji tags for notification + +state: + file_path: "./data/state.json" + dedup_window_hours: 24 # Don't repeat same alert within this window diff --git a/app/config/settings.yaml b/app/config/settings.yaml new file mode 100644 index 0000000..656225a --- /dev/null +++ b/app/config/settings.yaml @@ -0,0 +1,36 @@ +app: + name: "weather-alerts" + version: "1.0.0" + log_level: "INFO" + +weather: + location: "viola,tn" + hours_ahead: 24 + unit_group: "us" + +alerts: + rules: + temperature: + enabled: true + below: 32 + above: 95 + precipitation: + enabled: true + probability_above: 60 + wind: + enabled: true + speed_above: 25 + gust_above: 30 + severe_weather: + enabled: true + +notifications: + ntfy: + server_url: "https://ntfy.sneakygeek.net" + topic: "weather-alerts" + priority: "high" + tags: ["cloud", "warning"] + +state: + file_path: "./data/state.json" + dedup_window_hours: 24 diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..cb5e93c --- /dev/null +++ b/app/main.py @@ -0,0 +1,185 @@ +"""Main orchestration module for weather alerts application.""" + +import sys +from typing import Optional + +from app.config.loader import AppConfig, load_config +from app.services.alert_aggregator import AlertAggregator +from app.services.notification_service import NotificationService +from app.services.rule_engine import RuleEngine +from app.services.state_manager import StateManager +from app.services.weather_service import WeatherService, WeatherServiceError +from app.utils.http_client import HttpClient +from app.utils.logging_config import configure_logging, get_logger + + +class WeatherAlertsApp: + """Main application class for weather alerts.""" + + def __init__(self, config: AppConfig) -> None: + """Initialize the application with configuration. + + Args: + config: The application configuration. + """ + self.config = config + self.logger = get_logger(__name__) + + # Initialize HTTP client (shared across services) + self.http_client = HttpClient() + + # Initialize services + self.weather_service = WeatherService( + api_key=config.weather.api_key, + http_client=self.http_client, + ) + + self.rule_engine = RuleEngine(rules=config.alerts.rules) + + self.state_manager = StateManager( + file_path=config.state.file_path, + dedup_window_hours=config.state.dedup_window_hours, + ) + + self.alert_aggregator = AlertAggregator() + + self.notification_service = NotificationService( + server_url=config.notifications.ntfy.server_url, + topic=config.notifications.ntfy.topic, + access_token=config.notifications.ntfy.access_token, + priority=config.notifications.ntfy.priority, + default_tags=config.notifications.ntfy.tags, + http_client=self.http_client, + ) + + def run(self) -> int: + """Execute the main application flow. + + Returns: + Exit code (0 for success, 1 for error). + """ + self.logger.info( + "app_starting", + version=self.config.app.version, + location=self.config.weather.location, + ) + + try: + # Step 1: Fetch weather forecast + self.logger.info("step_fetch_forecast") + forecast = self.weather_service.get_forecast( + location=self.config.weather.location, + hours_ahead=self.config.weather.hours_ahead, + unit_group=self.config.weather.unit_group, + ) + + # Step 2: Evaluate rules against forecast + self.logger.info("step_evaluate_rules") + triggered_alerts = self.rule_engine.evaluate(forecast) + + if not triggered_alerts: + self.logger.info("no_alerts_triggered") + return 0 + + self.logger.info( + "alerts_triggered", + count=len(triggered_alerts), + ) + + # Step 2.5: Aggregate alerts by type + self.logger.info("step_aggregate_alerts") + aggregated_alerts = self.alert_aggregator.aggregate(triggered_alerts) + + self.logger.info( + "alerts_aggregated", + input_count=len(triggered_alerts), + output_count=len(aggregated_alerts), + ) + + # Step 3: Filter duplicates + self.logger.info("step_filter_duplicates") + new_alerts = self.state_manager.filter_duplicates(aggregated_alerts) + + if not new_alerts: + self.logger.info("all_alerts_are_duplicates") + return 0 + + # Step 4: Send notifications + self.logger.info( + "step_send_notifications", + count=len(new_alerts), + ) + results = self.notification_service.send_batch(new_alerts) + + # Step 5: Record sent alerts + self.logger.info("step_record_sent") + for result in results: + if result.success: + self.state_manager.record_sent(result.alert) + + # Step 6: Purge old records and save state + self.state_manager.purge_old_records() + self.state_manager.save() + + # Report results + success_count = sum(1 for r in results if r.success) + failed_count = len(results) - success_count + + self.logger.info( + "app_complete", + alerts_sent=success_count, + alerts_failed=failed_count, + ) + + return 0 if failed_count == 0 else 1 + + except WeatherServiceError as e: + self.logger.error("weather_service_error", error=str(e)) + return 1 + + except Exception as e: + self.logger.exception("unexpected_error", error=str(e)) + return 1 + + finally: + self.http_client.close() + + +def main(config_path: Optional[str] = None) -> int: + """Main entry point for the application. + + Args: + config_path: Optional path to configuration file. + + Returns: + Exit code (0 for success, 1 for error). + """ + # Load configuration + try: + config = load_config(config_path) + except Exception as e: + print(f"Failed to load configuration: {e}", file=sys.stderr) + return 1 + + # Configure logging + configure_logging(config.app.log_level) + logger = get_logger(__name__) + + # Validate required secrets + if not config.weather.api_key: + logger.error("missing_api_key", hint="Set VISUALCROSSING_API_KEY environment variable") + return 1 + + if not config.notifications.ntfy.access_token: + logger.warning( + "missing_ntfy_token", + hint="Set NTFY_ACCESS_TOKEN if your server requires auth", + ) + + # Run the application + app = WeatherAlertsApp(config) + return app.run() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..3a50695 --- /dev/null +++ b/app/models/__init__.py @@ -0,0 +1,17 @@ +"""Data models for weather alerts.""" + +from app.models.weather import HourlyForecast, WeatherForecast, WeatherAlert +from app.models.alerts import AggregatedAlert, AlertRules, AlertType, TriggeredAlert +from app.models.state import AlertState, SentAlertRecord + +__all__ = [ + "HourlyForecast", + "WeatherForecast", + "WeatherAlert", + "AggregatedAlert", + "AlertRules", + "AlertType", + "TriggeredAlert", + "AlertState", + "SentAlertRecord", +] diff --git a/app/models/alerts.py b/app/models/alerts.py new file mode 100644 index 0000000..d42f2de --- /dev/null +++ b/app/models/alerts.py @@ -0,0 +1,181 @@ +"""Alert rule and triggered alert models.""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Optional + + +class AlertType(Enum): + """Types of weather alerts that can be triggered.""" + + TEMPERATURE_LOW = "temperature_low" + TEMPERATURE_HIGH = "temperature_high" + PRECIPITATION = "precipitation" + WIND_SPEED = "wind_speed" + WIND_GUST = "wind_gust" + SEVERE_WEATHER = "severe_weather" + + +@dataclass +class TemperatureRule: + """Temperature alert rule configuration.""" + + enabled: bool = True + below: Optional[float] = 32 + above: Optional[float] = 100 + + +@dataclass +class PrecipitationRule: + """Precipitation alert rule configuration.""" + + enabled: bool = True + probability_above: float = 70 + + +@dataclass +class WindRule: + """Wind alert rule configuration.""" + + enabled: bool = True + speed_above: float = 25 + gust_above: float = 40 + + +@dataclass +class SevereWeatherRule: + """Severe weather alert rule configuration.""" + + enabled: bool = True + + +@dataclass +class AlertRules: + """Collection of all alert rules.""" + + temperature: TemperatureRule = field(default_factory=TemperatureRule) + precipitation: PrecipitationRule = field(default_factory=PrecipitationRule) + wind: WindRule = field(default_factory=WindRule) + severe_weather: SevereWeatherRule = field(default_factory=SevereWeatherRule) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "AlertRules": + """Create AlertRules from a configuration dict. + + Args: + data: The rules configuration dict. + + Returns: + An AlertRules instance. + """ + temp_data = data.get("temperature", {}) + precip_data = data.get("precipitation", {}) + wind_data = data.get("wind", {}) + severe_data = data.get("severe_weather", {}) + + return cls( + temperature=TemperatureRule( + enabled=temp_data.get("enabled", True), + below=temp_data.get("below", 32), + above=temp_data.get("above", 100), + ), + precipitation=PrecipitationRule( + enabled=precip_data.get("enabled", True), + probability_above=precip_data.get("probability_above", 70), + ), + wind=WindRule( + enabled=wind_data.get("enabled", True), + speed_above=wind_data.get("speed_above", 25), + gust_above=wind_data.get("gust_above", 40), + ), + severe_weather=SevereWeatherRule( + enabled=severe_data.get("enabled", True), + ), + ) + + +@dataclass +class TriggeredAlert: + """Represents an alert that was triggered by a rule evaluation.""" + + alert_type: AlertType + title: str + message: str + forecast_hour: str + value: float + threshold: float + created_at: datetime = field(default_factory=datetime.now) + + @property + def dedup_key(self) -> str: + """Generate a deduplication key for this alert. + + Format: {alert_type}:{forecast_hour} + This allows re-alerting for different time periods while preventing + duplicate alerts for the same hour. + """ + return f"{self.alert_type.value}:{self.forecast_hour}" + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "alert_type": self.alert_type.value, + "title": self.title, + "message": self.message, + "forecast_hour": self.forecast_hour, + "value": self.value, + "threshold": self.threshold, + "created_at": self.created_at.isoformat(), + "dedup_key": self.dedup_key, + } + + +@dataclass +class AggregatedAlert: + """Represents multiple alerts of the same type aggregated into one notification.""" + + alert_type: AlertType + title: str + message: str + triggered_hours: list[str] + start_time: str + end_time: str + extreme_value: float + extreme_hour: str + threshold: float + created_at: datetime = field(default_factory=datetime.now) + + @property + def dedup_key(self) -> str: + """Generate a deduplication key for this aggregated alert. + + Format: {alert_type}:{date} + Day-level deduplication prevents re-sending aggregated alerts + for the same alert type on the same day. + """ + # Extract date from the first triggered hour (format: YYYY-MM-DD-HH) + date_part = self.start_time.rsplit("-", 1)[0] if self.start_time else "" + return f"{self.alert_type.value}:{date_part}" + + @property + def hour_count(self) -> int: + """Number of hours that triggered this alert.""" + return len(self.triggered_hours) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "alert_type": self.alert_type.value, + "title": self.title, + "message": self.message, + "triggered_hours": self.triggered_hours, + "start_time": self.start_time, + "end_time": self.end_time, + "extreme_value": self.extreme_value, + "extreme_hour": self.extreme_hour, + "threshold": self.threshold, + "created_at": self.created_at.isoformat(), + "dedup_key": self.dedup_key, + "hour_count": self.hour_count, + } diff --git a/app/models/state.py b/app/models/state.py new file mode 100644 index 0000000..833e5f1 --- /dev/null +++ b/app/models/state.py @@ -0,0 +1,133 @@ +"""State management models for alert deduplication.""" + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + + +@dataclass +class SentAlertRecord: + """Record of a sent alert for deduplication.""" + + dedup_key: str + alert_type: str + sent_at: datetime + forecast_hour: str + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + "dedup_key": self.dedup_key, + "alert_type": self.alert_type, + "sent_at": self.sent_at.isoformat(), + "forecast_hour": self.forecast_hour, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "SentAlertRecord": + """Create from dictionary. + + Args: + data: The serialized record dict. + + Returns: + A SentAlertRecord instance. + """ + return cls( + dedup_key=data["dedup_key"], + alert_type=data["alert_type"], + sent_at=datetime.fromisoformat(data["sent_at"]), + forecast_hour=data["forecast_hour"], + ) + + +@dataclass +class AlertState: + """State container for tracking sent alerts.""" + + sent_alerts: dict[str, SentAlertRecord] = field(default_factory=dict) + last_updated: datetime = field(default_factory=datetime.now) + + def is_duplicate(self, dedup_key: str) -> bool: + """Check if an alert with this dedup key has already been sent. + + Args: + dedup_key: The deduplication key to check. + + Returns: + True if this alert has already been sent. + """ + return dedup_key in self.sent_alerts + + def record_sent(self, dedup_key: str, alert_type: str, forecast_hour: str) -> None: + """Record that an alert was sent. + + Args: + dedup_key: The deduplication key. + alert_type: The type of alert. + forecast_hour: The forecast hour this alert was for. + """ + self.sent_alerts[dedup_key] = SentAlertRecord( + dedup_key=dedup_key, + alert_type=alert_type, + sent_at=datetime.now(), + forecast_hour=forecast_hour, + ) + self.last_updated = datetime.now() + + def purge_old_records(self, window_hours: int) -> int: + """Remove records older than the deduplication window. + + Args: + window_hours: Number of hours to retain records. + + Returns: + Number of records purged. + """ + cutoff = datetime.now() + original_count = len(self.sent_alerts) + + self.sent_alerts = { + key: record + for key, record in self.sent_alerts.items() + if (cutoff - record.sent_at).total_seconds() < (window_hours * 3600) + } + + purged = original_count - len(self.sent_alerts) + if purged > 0: + self.last_updated = datetime.now() + + return purged + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + "sent_alerts": { + key: record.to_dict() for key, record in self.sent_alerts.items() + }, + "last_updated": self.last_updated.isoformat(), + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "AlertState": + """Create from dictionary. + + Args: + data: The serialized state dict. + + Returns: + An AlertState instance. + """ + sent_alerts = { + key: SentAlertRecord.from_dict(record_data) + for key, record_data in data.get("sent_alerts", {}).items() + } + + last_updated_str = data.get("last_updated") + last_updated = ( + datetime.fromisoformat(last_updated_str) + if last_updated_str + else datetime.now() + ) + + return cls(sent_alerts=sent_alerts, last_updated=last_updated) diff --git a/app/models/weather.py b/app/models/weather.py new file mode 100644 index 0000000..1745052 --- /dev/null +++ b/app/models/weather.py @@ -0,0 +1,160 @@ +"""Weather data models for VisualCrossing API responses.""" + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + + +@dataclass +class HourlyForecast: + """Represents a single hour's weather forecast.""" + + datetime_str: str + datetime_epoch: int + temp: float + feelslike: float + humidity: float + precip: float + precip_prob: float + snow: float + snow_depth: float + wind_speed: float + wind_gust: float + wind_dir: float + pressure: float + visibility: float + cloud_cover: float + uv_index: float + conditions: str + icon: str + + @property + def datetime(self) -> datetime: + """Convert epoch to datetime.""" + return datetime.fromtimestamp(self.datetime_epoch) + + @property + def hour_key(self) -> str: + """Get a key representing this forecast hour (YYYY-MM-DD-HH).""" + return self.datetime.strftime("%Y-%m-%d-%H") + + @classmethod + def from_api_data(cls, data: dict[str, Any]) -> "HourlyForecast": + """Create an HourlyForecast from VisualCrossing API data. + + Args: + data: The hourly data dict from the API response. + + Returns: + An HourlyForecast instance. + """ + return cls( + datetime_str=data.get("datetime", ""), + datetime_epoch=data.get("datetimeEpoch", 0), + temp=float(data.get("temp", 0)), + feelslike=float(data.get("feelslike", 0)), + humidity=float(data.get("humidity", 0)), + precip=float(data.get("precip") or 0), + precip_prob=float(data.get("precipprob") or 0), + snow=float(data.get("snow") or 0), + snow_depth=float(data.get("snowdepth") or 0), + wind_speed=float(data.get("windspeed") or 0), + wind_gust=float(data.get("windgust") or 0), + wind_dir=float(data.get("winddir") or 0), + pressure=float(data.get("pressure") or 0), + visibility=float(data.get("visibility") or 0), + cloud_cover=float(data.get("cloudcover") or 0), + uv_index=float(data.get("uvindex") or 0), + conditions=data.get("conditions", ""), + icon=data.get("icon", ""), + ) + + +@dataclass +class WeatherAlert: + """Represents a severe weather alert from the API.""" + + event: str + headline: str + description: str + onset: Optional[str] + ends: Optional[str] + id: str + language: str + link: str + + @classmethod + def from_api_data(cls, data: dict[str, Any]) -> "WeatherAlert": + """Create a WeatherAlert from VisualCrossing API data. + + Args: + data: The alert data dict from the API response. + + Returns: + A WeatherAlert instance. + """ + return cls( + event=data.get("event", "Unknown"), + headline=data.get("headline", ""), + description=data.get("description", ""), + onset=data.get("onset"), + ends=data.get("ends"), + id=data.get("id", ""), + language=data.get("language", "en"), + link=data.get("link", ""), + ) + + +@dataclass +class WeatherForecast: + """Complete weather forecast response.""" + + location: str + resolved_address: str + timezone: str + hourly_forecasts: list[HourlyForecast] = field(default_factory=list) + alerts: list[WeatherAlert] = field(default_factory=list) + + @classmethod + def from_api_data( + cls, + data: dict[str, Any], + hours_ahead: int = 24, + ) -> "WeatherForecast": + """Create a WeatherForecast from VisualCrossing API data. + + Args: + data: The full API response dict. + hours_ahead: Number of hours of forecast to include. + + Returns: + A WeatherForecast instance. + """ + # Extract hourly forecasts from days + hourly_forecasts: list[HourlyForecast] = [] + now = datetime.now() + + for day in data.get("days", []): + for hour_data in day.get("hours", []): + forecast = HourlyForecast.from_api_data(hour_data) + # Only include future hours up to hours_ahead + if forecast.datetime > now: + hourly_forecasts.append(forecast) + if len(hourly_forecasts) >= hours_ahead: + break + if len(hourly_forecasts) >= hours_ahead: + break + + # Extract alerts + alerts = [ + WeatherAlert.from_api_data(alert_data) + for alert_data in data.get("alerts", []) + ] + + return cls( + location=data.get("address", ""), + resolved_address=data.get("resolvedAddress", ""), + timezone=data.get("timezone", ""), + hourly_forecasts=hourly_forecasts, + alerts=alerts, + ) diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..18c5246 --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1,13 @@ +"""Service layer for weather alerts.""" + +from app.services.weather_service import WeatherService +from app.services.notification_service import NotificationService +from app.services.rule_engine import RuleEngine +from app.services.state_manager import StateManager + +__all__ = [ + "WeatherService", + "NotificationService", + "RuleEngine", + "StateManager", +] diff --git a/app/services/alert_aggregator.py b/app/services/alert_aggregator.py new file mode 100644 index 0000000..20f06b3 --- /dev/null +++ b/app/services/alert_aggregator.py @@ -0,0 +1,283 @@ +"""Alert aggregator service for combining multiple alerts of the same type.""" + +from collections import defaultdict +from datetime import datetime + +from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert +from app.utils.logging_config import get_logger + + +class AlertAggregator: + """Aggregates multiple alerts of the same type into single notifications.""" + + # Alert types where lower values are worse (e.g., low temperature) + LOWER_IS_WORSE: set[AlertType] = {AlertType.TEMPERATURE_LOW} + + def __init__(self) -> None: + """Initialize the alert aggregator.""" + self.logger = get_logger(__name__) + + def aggregate( + self, + alerts: list[TriggeredAlert], + ) -> list[AggregatedAlert]: + """Aggregate alerts by type into summary notifications. + + Severe weather alerts pass through unchanged. + Other alert types are grouped and combined into a single notification + per type with time range and extreme value information. + + Args: + alerts: List of triggered alerts to aggregate. + + Returns: + List of aggregated alerts (one per alert type). + """ + if not alerts: + return [] + + # Separate severe weather alerts from regular hourly alerts + severe_alerts: list[TriggeredAlert] = [] + hourly_alerts: list[TriggeredAlert] = [] + + for alert in alerts: + if alert.alert_type == AlertType.SEVERE_WEATHER: + severe_alerts.append(alert) + else: + hourly_alerts.append(alert) + + aggregated: list[AggregatedAlert] = [] + + # Convert severe weather alerts to AggregatedAlert (pass through) + aggregated.extend(self._convert_severe_alerts(severe_alerts)) + + # Aggregate hourly alerts by type + aggregated.extend(self._aggregate_by_type(hourly_alerts)) + + self.logger.info( + "alerts_aggregated", + input_count=len(alerts), + output_count=len(aggregated), + severe_count=len(severe_alerts), + ) + + return aggregated + + def _convert_severe_alerts( + self, + alerts: list[TriggeredAlert], + ) -> list[AggregatedAlert]: + """Convert severe weather alerts to AggregatedAlert format. + + Severe weather alerts are not aggregated - each one becomes + its own AggregatedAlert for individual notification. + + Args: + alerts: List of severe weather triggered alerts. + + Returns: + List of AggregatedAlert, one per severe weather alert. + """ + return [ + AggregatedAlert( + alert_type=alert.alert_type, + title=alert.title, + message=alert.message, + triggered_hours=[alert.forecast_hour], + start_time=alert.forecast_hour, + end_time=alert.forecast_hour, + extreme_value=alert.value, + extreme_hour=alert.forecast_hour, + threshold=alert.threshold, + created_at=alert.created_at, + ) + for alert in alerts + ] + + def _aggregate_by_type( + self, + alerts: list[TriggeredAlert], + ) -> list[AggregatedAlert]: + """Aggregate hourly alerts by alert type. + + Args: + alerts: List of hourly triggered alerts. + + Returns: + List of AggregatedAlert, one per alert type. + """ + # Group alerts by type + by_type: dict[AlertType, list[TriggeredAlert]] = defaultdict(list) + + for alert in alerts: + by_type[alert.alert_type].append(alert) + + aggregated: list[AggregatedAlert] = [] + + for alert_type, type_alerts in by_type.items(): + aggregated_alert = self._aggregate_type_group(alert_type, type_alerts) + aggregated.append(aggregated_alert) + + return aggregated + + def _aggregate_type_group( + self, + alert_type: AlertType, + alerts: list[TriggeredAlert], + ) -> AggregatedAlert: + """Create a single AggregatedAlert from a group of same-type alerts. + + Args: + alert_type: The type of all alerts in the group. + alerts: List of alerts of the same type. + + Returns: + A single AggregatedAlert summarizing the group. + """ + # Sort by forecast hour for chronological ordering + sorted_alerts = sorted(alerts, key=lambda a: a.forecast_hour) + + # Collect all triggered hours + triggered_hours = [a.forecast_hour for a in sorted_alerts] + start_time = sorted_alerts[0].forecast_hour + end_time = sorted_alerts[-1].forecast_hour + + # Find extreme value (lowest for low temp, highest for others) + if alert_type in self.LOWER_IS_WORSE: + extreme_alert = min(sorted_alerts, key=lambda a: a.value) + else: + extreme_alert = max(sorted_alerts, key=lambda a: a.value) + + extreme_value = extreme_alert.value + extreme_hour = extreme_alert.forecast_hour + threshold = sorted_alerts[0].threshold # Same for all alerts of same type + + # Build summary message + message = self._build_summary_message( + alert_type=alert_type, + start_time=start_time, + end_time=end_time, + extreme_value=extreme_value, + extreme_hour=extreme_hour, + threshold=threshold, + hour_count=len(sorted_alerts), + ) + + # Build title + title = self._build_title(alert_type) + + return AggregatedAlert( + alert_type=alert_type, + title=title, + message=message, + triggered_hours=triggered_hours, + start_time=start_time, + end_time=end_time, + extreme_value=extreme_value, + extreme_hour=extreme_hour, + threshold=threshold, + ) + + def _build_title(self, alert_type: AlertType) -> str: + """Build a title for the aggregated alert. + + Args: + alert_type: The type of alert. + + Returns: + Title string. + """ + titles = { + AlertType.TEMPERATURE_LOW: "Low Temperature Alert", + AlertType.TEMPERATURE_HIGH: "High Temperature Alert", + AlertType.PRECIPITATION: "Precipitation Alert", + AlertType.WIND_SPEED: "High Wind Alert", + AlertType.WIND_GUST: "Wind Gust Alert", + } + return titles.get(alert_type, f"{alert_type.value} Alert") + + def _build_summary_message( + self, + alert_type: AlertType, + start_time: str, + end_time: str, + extreme_value: float, + extreme_hour: str, + threshold: float, + hour_count: int, + ) -> str: + """Build a summary message for the aggregated alert. + + Args: + alert_type: The type of alert. + start_time: First hour that triggered (YYYY-MM-DD-HH format). + end_time: Last hour that triggered (YYYY-MM-DD-HH format). + extreme_value: The most extreme value recorded. + extreme_hour: Hour when extreme value occurred. + threshold: The threshold that was exceeded. + hour_count: Number of hours that triggered. + + Returns: + Human-readable summary message. + """ + # Format times for display + start_display = self._format_hour_display(start_time) + end_display = self._format_hour_display(end_time) + extreme_display = self._format_hour_display(extreme_hour) + + # Build type-specific message + if alert_type == AlertType.TEMPERATURE_LOW: + return ( + f"Low temps from {start_display} - {end_display}. " + f"Lowest: {extreme_value:.0f}°F at {extreme_display}. " + f"({hour_count} hours below {threshold:.0f}°F)" + ) + + elif alert_type == AlertType.TEMPERATURE_HIGH: + return ( + f"High temps from {start_display} - {end_display}. " + f"Highest: {extreme_value:.0f}°F at {extreme_display}. " + f"({hour_count} hours above {threshold:.0f}°F)" + ) + + elif alert_type == AlertType.PRECIPITATION: + return ( + f"Precipitation likely from {start_display} - {end_display}. " + f"Peak: {extreme_value:.0f}% at {extreme_display}. " + f"({hour_count} hours above {threshold:.0f}%)" + ) + + elif alert_type == AlertType.WIND_SPEED: + return ( + f"High winds from {start_display} - {end_display}. " + f"Peak: {extreme_value:.0f} mph at {extreme_display}. " + f"({hour_count} hours above {threshold:.0f} mph)" + ) + + elif alert_type == AlertType.WIND_GUST: + return ( + f"Wind gusts from {start_display} - {end_display}. " + f"Peak: {extreme_value:.0f} mph at {extreme_display}. " + f"({hour_count} hours above {threshold:.0f} mph)" + ) + + # Fallback for unknown types + return ( + f"Alert from {start_display} - {end_display}. " + f"({hour_count} hours affected)" + ) + + def _format_hour_display(self, hour_key: str) -> str: + """Format an hour key for human display. + + Args: + hour_key: Hour key in YYYY-MM-DD-HH format. + + Returns: + Human-readable time string (e.g., "3 PM" or "6 AM"). + """ + try: + dt = datetime.strptime(hour_key, "%Y-%m-%d-%H") + return dt.strftime("%-I %p") + except ValueError: + return hour_key diff --git a/app/services/notification_service.py b/app/services/notification_service.py new file mode 100644 index 0000000..67fbf83 --- /dev/null +++ b/app/services/notification_service.py @@ -0,0 +1,177 @@ +"""Notification service for sending alerts via ntfy.""" + +from dataclasses import dataclass +from typing import Optional, Union + +from app.models.alerts import AggregatedAlert, AlertType, TriggeredAlert +from app.utils.http_client import HttpClient +from app.utils.logging_config import get_logger + +# Type alias for alerts that can be sent +SendableAlert = Union[TriggeredAlert, AggregatedAlert] + + +@dataclass +class NotificationResult: + """Result of sending a notification.""" + + alert: SendableAlert + success: bool + error: Optional[str] = None + + +class NotificationServiceError(Exception): + """Raised when notification service encounters an error.""" + + pass + + +class NotificationService: + """Service for sending notifications via ntfy.""" + + # Map alert types to emoji tags + ALERT_TYPE_TAGS: dict[AlertType, list[str]] = { + AlertType.TEMPERATURE_LOW: ["cold_face", "thermometer"], + AlertType.TEMPERATURE_HIGH: ["hot_face", "thermometer"], + AlertType.PRECIPITATION: ["cloud_with_rain", "umbrella"], + AlertType.WIND_SPEED: ["wind_face", "dash"], + AlertType.WIND_GUST: ["tornado", "dash"], + AlertType.SEVERE_WEATHER: ["rotating_light", "warning"], + } + + def __init__( + self, + server_url: str, + topic: str, + access_token: str = "", + priority: str = "high", + default_tags: Optional[list[str]] = None, + http_client: Optional[HttpClient] = None, + ) -> None: + """Initialize the notification service. + + Args: + server_url: The ntfy server URL. + topic: The topic to publish to. + access_token: Optional bearer token for authentication. + priority: Default notification priority. + default_tags: Default tags to include with notifications. + http_client: Optional HTTP client instance. + """ + self.server_url = server_url.rstrip("/") + self.topic = topic + self.access_token = access_token + self.priority = priority + self.default_tags = default_tags or ["cloud", "warning"] + self.http_client = http_client or HttpClient() + self.logger = get_logger(__name__) + + def send(self, alert: SendableAlert) -> NotificationResult: + """Send a single alert notification. + + Args: + alert: The triggered or aggregated alert to send. + + Returns: + NotificationResult indicating success or failure. + """ + url = f"{self.server_url}/{self.topic}" + + # Build headers + headers = { + "Title": alert.title, + "Priority": self._get_priority(alert), + "Tags": ",".join(self._get_tags(alert)), + } + + if self.access_token: + headers["Authorization"] = f"Bearer {self.access_token}" + + self.logger.debug( + "sending_notification", + alert_type=alert.alert_type.value, + title=alert.title, + ) + + response = self.http_client.post( + url, + data=alert.message.encode("utf-8"), + headers=headers, + ) + + if response.success: + self.logger.info( + "notification_sent", + alert_type=alert.alert_type.value, + dedup_key=alert.dedup_key, + ) + return NotificationResult(alert=alert, success=True) + else: + error_msg = f"HTTP {response.status_code}: {response.text[:100]}" + self.logger.error( + "notification_failed", + alert_type=alert.alert_type.value, + error=error_msg, + ) + return NotificationResult(alert=alert, success=False, error=error_msg) + + def send_batch( + self, + alerts: list[SendableAlert], + ) -> list[NotificationResult]: + """Send multiple alert notifications. + + Args: + alerts: List of triggered or aggregated alerts to send. + + Returns: + List of NotificationResult for each alert. + """ + if not alerts: + self.logger.info("no_alerts_to_send") + return [] + + results: list[NotificationResult] = [] + + for alert in alerts: + result = self.send(alert) + results.append(result) + + success_count = sum(1 for r in results if r.success) + self.logger.info( + "batch_send_complete", + total=len(alerts), + success=success_count, + failed=len(alerts) - success_count, + ) + + return results + + def _get_priority(self, alert: SendableAlert) -> str: + """Determine notification priority based on alert type. + + Args: + alert: The triggered or aggregated alert. + + Returns: + Priority string for ntfy. + """ + # Severe weather always gets urgent priority + if alert.alert_type == AlertType.SEVERE_WEATHER: + return "urgent" + + return self.priority + + def _get_tags(self, alert: SendableAlert) -> list[str]: + """Get notification tags for an alert. + + Args: + alert: The triggered or aggregated alert. + + Returns: + List of emoji tags for ntfy. + """ + # Start with alert-specific tags + tags = list(self.ALERT_TYPE_TAGS.get(alert.alert_type, self.default_tags)) + + return tags diff --git a/app/services/rule_engine.py b/app/services/rule_engine.py new file mode 100644 index 0000000..bc3c528 --- /dev/null +++ b/app/services/rule_engine.py @@ -0,0 +1,231 @@ +"""Rule engine for evaluating weather conditions against alert rules.""" + +from typing import Optional + +from app.models.alerts import AlertRules, AlertType, TriggeredAlert +from app.models.weather import HourlyForecast, WeatherAlert, WeatherForecast +from app.utils.logging_config import get_logger + + +class RuleEngine: + """Evaluates weather forecasts against configured alert rules.""" + + def __init__(self, rules: AlertRules) -> None: + """Initialize the rule engine. + + Args: + rules: The alert rules configuration. + """ + self.rules = rules + self.logger = get_logger(__name__) + + def evaluate(self, forecast: WeatherForecast) -> list[TriggeredAlert]: + """Evaluate a forecast against all enabled rules. + + Args: + forecast: The weather forecast to evaluate. + + Returns: + List of triggered alerts. + """ + alerts: list[TriggeredAlert] = [] + + # Evaluate hourly forecasts + for hourly in forecast.hourly_forecasts: + alerts.extend(self._evaluate_hourly(hourly)) + + # Evaluate severe weather alerts from API + if self.rules.severe_weather.enabled: + alerts.extend(self._evaluate_severe_alerts(forecast.alerts)) + + self.logger.info( + "rules_evaluated", + hourly_count=len(forecast.hourly_forecasts), + triggered_count=len(alerts), + ) + + return alerts + + def _evaluate_hourly(self, hourly: HourlyForecast) -> list[TriggeredAlert]: + """Evaluate a single hourly forecast against rules. + + Args: + hourly: The hourly forecast data. + + Returns: + List of triggered alerts for this hour. + """ + alerts: list[TriggeredAlert] = [] + + # Temperature rules + if self.rules.temperature.enabled: + alert = self._check_temperature(hourly) + if alert: + alerts.append(alert) + + # Precipitation rules + if self.rules.precipitation.enabled: + alert = self._check_precipitation(hourly) + if alert: + alerts.append(alert) + + # Wind rules + if self.rules.wind.enabled: + wind_alerts = self._check_wind(hourly) + alerts.extend(wind_alerts) + + return alerts + + def _check_temperature( + self, + hourly: HourlyForecast, + ) -> Optional[TriggeredAlert]: + """Check temperature thresholds. + + Args: + hourly: The hourly forecast data. + + Returns: + TriggeredAlert if threshold exceeded, None otherwise. + """ + temp_rule = self.rules.temperature + + # Check low temperature + if temp_rule.below is not None and hourly.temp < temp_rule.below: + return TriggeredAlert( + alert_type=AlertType.TEMPERATURE_LOW, + title="Low Temperature Alert", + message=( + f"Temperature expected to drop to {hourly.temp:.0f}°F " + f"at {hourly.datetime.strftime('%I:%M %p on %b %d')}. " + f"Threshold: {temp_rule.below:.0f}°F" + ), + forecast_hour=hourly.hour_key, + value=hourly.temp, + threshold=temp_rule.below, + ) + + # Check high temperature + if temp_rule.above is not None and hourly.temp > temp_rule.above: + return TriggeredAlert( + alert_type=AlertType.TEMPERATURE_HIGH, + title="High Temperature Alert", + message=( + f"Temperature expected to reach {hourly.temp:.0f}°F " + f"at {hourly.datetime.strftime('%I:%M %p on %b %d')}. " + f"Threshold: {temp_rule.above:.0f}°F" + ), + forecast_hour=hourly.hour_key, + value=hourly.temp, + threshold=temp_rule.above, + ) + + return None + + def _check_precipitation( + self, + hourly: HourlyForecast, + ) -> Optional[TriggeredAlert]: + """Check precipitation probability threshold. + + Args: + hourly: The hourly forecast data. + + Returns: + TriggeredAlert if threshold exceeded, None otherwise. + """ + precip_rule = self.rules.precipitation + threshold = precip_rule.probability_above + + if hourly.precip_prob > threshold: + return TriggeredAlert( + alert_type=AlertType.PRECIPITATION, + title="Precipitation Alert", + message=( + f"{hourly.precip_prob:.0f}% chance of precipitation " + f"at {hourly.datetime.strftime('%I:%M %p on %b %d')}. " + f"Conditions: {hourly.conditions}" + ), + forecast_hour=hourly.hour_key, + value=hourly.precip_prob, + threshold=threshold, + ) + + return None + + def _check_wind(self, hourly: HourlyForecast) -> list[TriggeredAlert]: + """Check wind speed and gust thresholds. + + Args: + hourly: The hourly forecast data. + + Returns: + List of triggered wind alerts. + """ + alerts: list[TriggeredAlert] = [] + wind_rule = self.rules.wind + + # Check sustained wind speed + if hourly.wind_speed > wind_rule.speed_above: + alerts.append( + TriggeredAlert( + alert_type=AlertType.WIND_SPEED, + title="High Wind Alert", + message=( + f"Sustained winds of {hourly.wind_speed:.0f} mph expected " + f"at {hourly.datetime.strftime('%I:%M %p on %b %d')}. " + f"Threshold: {wind_rule.speed_above:.0f} mph" + ), + forecast_hour=hourly.hour_key, + value=hourly.wind_speed, + threshold=wind_rule.speed_above, + ) + ) + + # Check wind gusts + if hourly.wind_gust > wind_rule.gust_above: + alerts.append( + TriggeredAlert( + alert_type=AlertType.WIND_GUST, + title="Wind Gust Alert", + message=( + f"Wind gusts up to {hourly.wind_gust:.0f} mph expected " + f"at {hourly.datetime.strftime('%I:%M %p on %b %d')}. " + f"Threshold: {wind_rule.gust_above:.0f} mph" + ), + forecast_hour=hourly.hour_key, + value=hourly.wind_gust, + threshold=wind_rule.gust_above, + ) + ) + + return alerts + + def _evaluate_severe_alerts( + self, + api_alerts: list[WeatherAlert], + ) -> list[TriggeredAlert]: + """Convert API severe weather alerts to triggered alerts. + + Args: + api_alerts: List of WeatherAlert from the API. + + Returns: + List of triggered severe weather alerts. + """ + triggered: list[TriggeredAlert] = [] + + for api_alert in api_alerts: + # Use alert ID as the hour key for deduplication + triggered.append( + TriggeredAlert( + alert_type=AlertType.SEVERE_WEATHER, + title=f"Severe Weather: {api_alert.event}", + message=api_alert.headline or api_alert.description[:200], + forecast_hour=api_alert.id or api_alert.event, + value=1.0, # Placeholder - severe alerts don't have numeric values + threshold=0.0, + ) + ) + + return triggered diff --git a/app/services/state_manager.py b/app/services/state_manager.py new file mode 100644 index 0000000..ad99311 --- /dev/null +++ b/app/services/state_manager.py @@ -0,0 +1,185 @@ +"""State manager for alert deduplication with atomic file persistence.""" + +import json +import os +import tempfile +from pathlib import Path +from typing import Optional, Union + +from app.models.alerts import AggregatedAlert, TriggeredAlert +from app.models.state import AlertState +from app.utils.logging_config import get_logger + +# Type alias for alerts that can be deduplicated +DeduplicableAlert = Union[TriggeredAlert, AggregatedAlert] + + +class StateManagerError(Exception): + """Raised when state management encounters an error.""" + + pass + + +class StateManager: + """Manages alert state persistence for deduplication.""" + + def __init__( + self, + file_path: str, + dedup_window_hours: int = 24, + ) -> None: + """Initialize the state manager. + + Args: + file_path: Path to the state JSON file. + dedup_window_hours: Hours to retain sent alert records. + """ + self.file_path = Path(file_path) + self.dedup_window_hours = dedup_window_hours + self.logger = get_logger(__name__) + self._state: Optional[AlertState] = None + + @property + def state(self) -> AlertState: + """Get the current state, loading from file if necessary.""" + if self._state is None: + self._state = self.load() + return self._state + + def load(self) -> AlertState: + """Load state from file. + + Returns: + AlertState instance, empty if file doesn't exist. + """ + if not self.file_path.exists(): + self.logger.info("state_file_not_found", path=str(self.file_path)) + return AlertState() + + try: + with open(self.file_path) as f: + data = json.load(f) + + state = AlertState.from_dict(data) + self.logger.info( + "state_loaded", + path=str(self.file_path), + record_count=len(state.sent_alerts), + ) + return state + + except json.JSONDecodeError as e: + self.logger.warning( + "state_file_corrupt", + path=str(self.file_path), + error=str(e), + ) + return AlertState() + + def save(self) -> None: + """Save state to file with atomic write. + + Uses write-to-temp-then-rename for crash safety. + """ + if self._state is None: + return + + # Ensure directory exists + self.file_path.parent.mkdir(parents=True, exist_ok=True) + + # Write to temp file first + dir_path = self.file_path.parent + try: + fd, temp_path = tempfile.mkstemp( + suffix=".tmp", + prefix="state_", + dir=dir_path, + ) + try: + with os.fdopen(fd, "w") as f: + json.dump(self._state.to_dict(), f, indent=2) + + # Atomic rename + os.replace(temp_path, self.file_path) + + self.logger.debug( + "state_saved", + path=str(self.file_path), + record_count=len(self._state.sent_alerts), + ) + + except Exception: + # Clean up temp file on error + if os.path.exists(temp_path): + os.unlink(temp_path) + raise + + except OSError as e: + self.logger.error("state_save_failed", error=str(e)) + raise StateManagerError(f"Failed to save state: {e}") + + def filter_duplicates( + self, + alerts: list[DeduplicableAlert], + ) -> list[DeduplicableAlert]: + """Filter out alerts that have already been sent. + + Args: + alerts: List of triggered or aggregated alerts. + + Returns: + List of alerts that haven't been sent within the dedup window. + """ + new_alerts: list[DeduplicableAlert] = [] + + for alert in alerts: + if not self.state.is_duplicate(alert.dedup_key): + new_alerts.append(alert) + else: + self.logger.debug( + "alert_filtered_duplicate", + dedup_key=alert.dedup_key, + ) + + filtered_count = len(alerts) - len(new_alerts) + if filtered_count > 0: + self.logger.info( + "duplicates_filtered", + total=len(alerts), + new=len(new_alerts), + duplicates=filtered_count, + ) + + return new_alerts + + def record_sent(self, alert: DeduplicableAlert) -> None: + """Record that an alert was sent. + + Args: + alert: The triggered or aggregated alert that was sent. + """ + # Get the forecast hour - AggregatedAlert uses start_time, TriggeredAlert uses forecast_hour + if isinstance(alert, AggregatedAlert): + forecast_hour = alert.start_time + else: + forecast_hour = alert.forecast_hour + + self.state.record_sent( + dedup_key=alert.dedup_key, + alert_type=alert.alert_type.value, + forecast_hour=forecast_hour, + ) + self.logger.debug("alert_recorded", dedup_key=alert.dedup_key) + + def purge_old_records(self) -> int: + """Remove records older than the deduplication window. + + Returns: + Number of records purged. + """ + purged = self.state.purge_old_records(self.dedup_window_hours) + + if purged > 0: + self.logger.info("old_records_purged", count=purged) + + return purged diff --git a/app/services/weather_service.py b/app/services/weather_service.py new file mode 100644 index 0000000..8b35086 --- /dev/null +++ b/app/services/weather_service.py @@ -0,0 +1,101 @@ +"""Weather service for fetching forecasts from VisualCrossing API.""" + +from typing import Optional +from urllib.parse import quote + +from app.models.weather import WeatherForecast +from app.utils.http_client import HttpClient +from app.utils.logging_config import get_logger + + +class WeatherServiceError(Exception): + """Raised when the weather service encounters an error.""" + + pass + + +class WeatherService: + """Client for the VisualCrossing Weather API.""" + + BASE_URL = "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline" + + def __init__( + self, + api_key: str, + http_client: Optional[HttpClient] = None, + ) -> None: + """Initialize the weather service. + + Args: + api_key: VisualCrossing API key. + http_client: Optional HTTP client instance. Creates one if not provided. + """ + if not api_key: + raise WeatherServiceError("VisualCrossing API key is required") + + self.api_key = api_key + self.http_client = http_client or HttpClient() + self.logger = get_logger(__name__) + + def get_forecast( + self, + location: str, + hours_ahead: int = 24, + unit_group: str = "us", + ) -> WeatherForecast: + """Fetch weather forecast for a location. + + Args: + location: Location string (e.g., "viola,tn" or ZIP code). + hours_ahead: Number of hours of forecast to retrieve. + unit_group: Unit system ("us" for imperial, "metric" for metric). + + Returns: + WeatherForecast with hourly data and any active alerts. + + Raises: + WeatherServiceError: If the API request fails. + """ + self.logger.info( + "fetching_forecast", + location=location, + hours_ahead=hours_ahead, + ) + + # Build API URL + encoded_location = quote(location, safe="") + url = f"{self.BASE_URL}/{encoded_location}" + + params = { + "unitGroup": unit_group, + "include": "days,hours,alerts,current,events", + "key": self.api_key, + "contentType": "json", + } + + response = self.http_client.get(url, params=params) + + if not response.success: + self.logger.error( + "forecast_fetch_failed", + status_code=response.status_code, + error=response.text, + ) + raise WeatherServiceError( + f"Failed to fetch forecast: {response.status_code} - {response.text}" + ) + + if response.json_data is None: + self.logger.error("forecast_invalid_json", response_text=response.text[:200]) + raise WeatherServiceError("Invalid JSON response from weather API") + + forecast = WeatherForecast.from_api_data(response.json_data, hours_ahead) + + self.logger.info( + "forecast_fetched", + location=forecast.resolved_address, + hourly_count=len(forecast.hourly_forecasts), + alert_count=len(forecast.alerts), + ) + + return forecast diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..5f45a14 --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1,6 @@ +"""Utility modules for weather alerts.""" + +from app.utils.http_client import HttpClient +from app.utils.logging_config import configure_logging, get_logger + +__all__ = ["HttpClient", "configure_logging", "get_logger"] diff --git a/app/utils/http_client.py b/app/utils/http_client.py new file mode 100644 index 0000000..4e5710f --- /dev/null +++ b/app/utils/http_client.py @@ -0,0 +1,162 @@ +"""Centralized HTTP client wrapper with retries and consistent error handling.""" + +from dataclasses import dataclass +from typing import Any, Optional + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from app.utils.logging_config import get_logger + + +@dataclass +class HttpResponse: + """Wrapper for HTTP response data.""" + + status_code: int + json_data: Optional[dict[str, Any]] + text: str + success: bool + + +class HttpClient: + """HTTP client with automatic retries, timeouts, and logging.""" + + def __init__( + self, + timeout: int = 30, + max_retries: int = 3, + backoff_factor: float = 0.5, + ) -> None: + """Initialize the HTTP client. + + Args: + timeout: Request timeout in seconds. + max_retries: Maximum number of retry attempts. + backoff_factor: Multiplier for exponential backoff between retries. + """ + self.timeout = timeout + self.logger = get_logger(__name__) + + # Configure retry strategy + retry_strategy = Retry( + total=max_retries, + backoff_factor=backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET", "POST"], + ) + + # Create session with retry adapter + self.session = requests.Session() + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + def get( + self, + url: str, + params: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + ) -> HttpResponse: + """Perform a GET request. + + Args: + url: The URL to request. + params: Optional query parameters. + headers: Optional HTTP headers. + + Returns: + HttpResponse with status, data, and success flag. + """ + self.logger.debug("http_get", url=url, params=params) + + try: + response = self.session.get( + url, + params=params, + headers=headers, + timeout=self.timeout, + ) + return self._build_response(response) + except requests.RequestException as e: + self.logger.error("http_get_failed", url=url, error=str(e)) + return HttpResponse( + status_code=0, + json_data=None, + text=str(e), + success=False, + ) + + def post( + self, + url: str, + data: Optional[dict[str, Any]] = None, + json_data: Optional[dict[str, Any]] = None, + headers: Optional[dict[str, str]] = None, + ) -> HttpResponse: + """Perform a POST request. + + Args: + url: The URL to request. + data: Optional form data. + json_data: Optional JSON data. + headers: Optional HTTP headers. + + Returns: + HttpResponse with status, data, and success flag. + """ + self.logger.debug("http_post", url=url) + + try: + response = self.session.post( + url, + data=data, + json=json_data, + headers=headers, + timeout=self.timeout, + ) + return self._build_response(response) + except requests.RequestException as e: + self.logger.error("http_post_failed", url=url, error=str(e)) + return HttpResponse( + status_code=0, + json_data=None, + text=str(e), + success=False, + ) + + def _build_response(self, response: requests.Response) -> HttpResponse: + """Build an HttpResponse from a requests Response. + + Args: + response: The requests library Response object. + + Returns: + HttpResponse wrapper. + """ + json_data = None + if response.headers.get("content-type", "").startswith("application/json"): + try: + json_data = response.json() + except ValueError: + pass + + success = 200 <= response.status_code < 300 + + self.logger.debug( + "http_response", + status_code=response.status_code, + success=success, + ) + + return HttpResponse( + status_code=response.status_code, + json_data=json_data, + text=response.text, + success=success, + ) + + def close(self) -> None: + """Close the HTTP session.""" + self.session.close() diff --git a/app/utils/logging_config.py b/app/utils/logging_config.py new file mode 100644 index 0000000..4db10e3 --- /dev/null +++ b/app/utils/logging_config.py @@ -0,0 +1,54 @@ +"""Structlog configuration for consistent logging throughout the application.""" + +import logging +import sys +from typing import Optional + +import structlog + + +def configure_logging(log_level: str = "INFO") -> None: + """Configure structlog with console output. + + Args: + log_level: The logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). + """ + level = getattr(logging, log_level.upper(), logging.INFO) + + # Configure standard library logging + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=level, + ) + + # Configure structlog + structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + structlog.dev.ConsoleRenderer(colors=sys.stdout.isatty()), + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + + +def get_logger(name: Optional[str] = None) -> structlog.stdlib.BoundLogger: + """Get a configured logger instance. + + Args: + name: Optional name for the logger. If not provided, uses the calling module. + + Returns: + A configured structlog logger. + """ + return structlog.get_logger(name) diff --git a/docs/CLAUDE.md b/docs/CLAUDE.md new file mode 100644 index 0000000..caae709 --- /dev/null +++ b/docs/CLAUDE.md @@ -0,0 +1,20 @@ +# Documentation Folder + +This folder contains business planning, architecture decisions, and documentation. + +**No application code belongs here.** + +## Contents + +| Document | Purpose | +|----------|---------| +| `code_guidelinees.md` | Code creation guidelines and Git strategy | +| `security.md` | Python focused security code suggestions | + + +## Guidelines + +- Keep documents focused and concise +- Update docs when architecture decisions change +- Use markdown tables for structured information +- Include links to external documentation where relevant diff --git a/docs/code_guidelines.md b/docs/code_guidelines.md new file mode 100644 index 0000000..4db7531 --- /dev/null +++ b/docs/code_guidelines.md @@ -0,0 +1,120 @@ +### Coding Standards + +**Style & Structure** +- Prefer longer, explicit code over compact one-liners +- Always include docstrings for functions/classes + inline comments +- Strongly prefer OOP-style code (classes over functional/nested functions) +- Strong typing throughout (dataclasses, TypedDict, Enums, type hints) +- Value future-proofing and expanded usage insights + +**Data Design** +- Use dataclasses for internal data modeling +- Typed JSON structures +- Functions return fully typed objects (no loose dicts) +- Snapshot files in JSON or YAML +- Human-readable fields (e.g., `scan_duration`) + +**Templates & UI** +- Don't mix large HTML/CSS blocks in Python code +- Prefer Jinja templates for HTML rendering +- Clean CSS, minimal inline clutter, readable template logic + +**Writing & Documentation** +- Markdown documentation +- Clear section headers +- Roadmap/Phase/Feature-Session style documents +- Boilerplate templates first, then refinements + +**Logging** +- Use structlog (pip package) +- Setup logging at app start: `logger = logging.get_logger(__file__)` + +**Preferred Pip Packages** +- API/Web Server: Flask +- HTTP: Requests +- Logging: Structlog +- Scheduling: APScheduler + +### Error Handling +- Custom exception classes for domain-specific errors +- Consistent error response formats (JSON structure) +- Logging severity levels (ERROR vs WARNING) + +### Configuration +- Each component has environment-specific configs in its own `/config/*.yaml` + - API: `/api/config/development.yaml`, `/api/config/production.yaml` + - Web: `/public_web/config/development.yaml`, `/public_web/config/production.yaml` +- `.env` for secrets (never committed) +- Maintain `.env.example` in each component for documentation +- Typed config loaders using dataclasses +- Validation on startup + +### Containerization & Deployment +- Explicit Dockerfiles +- Production-friendly hardening (distroless/slim when meaningful) +- Clear build/push scripts that: + - Use git branch as tag + - Ask whether to tag `:latest` + - Ask whether to push + - Support private registries + +### API Design +- RESTful conventions +- Versioning strategy (`/api/v1/...`) +- Standardized response format: + +```json +{ + "app": "", + "version": "", + "status": , + "timestamp": "", + "request_id": "", + "result": , + "error": { + "code": "", + "message": "", + "details": {} + }, + "meta": {} +} +``` + +### Dependency Management +- Use `requirements.txt` and virtual environments (`python3 -m venv venv`) +- Use path `venv` for all virtual environments +- Pin versions to version ranges +- Activate venv before running code (unless in Docker) + +### Testing Standards +- Manual testing preferred for applications +- **API Backend:** Maintain `api/docs/API_TESTING.md` with endpoint examples, curl/httpie commands, expected responses +- **Unit tests:** Use pytest for API backend (`api/tests/`) +- **Web Frontend:** If using a web frontend, Manual testing checklist are created in `public_web/docs` + +### Git Standards + +**Branch Strategy:** +- `master` - Production-ready code only +- `dev` - Main development branch, integration point +- `beta` - (Optional) Public pre-release testing + +**Workflow:** +- Feature work branches off `dev` (e.g., `feature/add-scheduler`) +- Merge features back to `dev` for testing +- Promote `dev` → `beta` for public testing (when applicable) +- Promote `beta` (or `dev`) → `master` for production + +**Commit Messages:** +- Use conventional commit format: `feat:`, `fix:`, `docs:`, `refactor:`, etc. +- Keep commits atomic and focused +- Write clear, descriptive messages + +**Tagging:** +- Tag releases on `master` with semantic versioning (e.g., `v1.2.3`) +- Optionally tag beta releases (e.g., `v1.2.3-beta.1`) + +--- + +## Workflow Preference +I follow a pattern: **brainstorm → design → code → revise** \ No newline at end of file diff --git a/docs/implementation_roadmap.md b/docs/implementation_roadmap.md new file mode 100644 index 0000000..7f58b42 --- /dev/null +++ b/docs/implementation_roadmap.md @@ -0,0 +1,164 @@ +# Weather Alerts - Implementation Roadmap + +## Overview +This document captures the implementation plan and decisions for the weather alerts application. + +## Architecture + +### Directory Structure +``` +weather_alerts/ +├── app/ +│ ├── __init__.py +│ ├── main.py # Entry point - orchestration +│ ├── config/ +│ │ ├── __init__.py +│ │ ├── loader.py # Typed YAML config loader +│ │ ├── settings.yaml # Main configuration +│ │ └── settings.example.yaml # Example (no secrets) +│ ├── models/ +│ │ ├── __init__.py +│ │ ├── weather.py # Weather data models +│ │ ├── alerts.py # Alert rule/triggered alert models +│ │ └── state.py # Deduplication state models +│ ├── services/ +│ │ ├── __init__.py +│ │ ├── weather_service.py # VisualCrossing API client +│ │ ├── notification_service.py # ntfy sender +│ │ ├── rule_engine.py # Alert rule evaluation +│ │ └── state_manager.py # Deduplication persistence +│ └── utils/ +│ ├── __init__.py +│ ├── http_client.py # Centralized HTTP wrapper +│ └── logging_config.py # Structlog setup +├── tests/ +├── data/ # State files (gitignored) +├── .env.example +├── .gitignore +├── requirements.txt +└── run.py # Simple runner +``` + +## Key Design Decisions + +### 1. One-Shot Execution +The application is designed for cron-based scheduling rather than internal scheduling. Each run: +1. Fetches current forecast +2. Evaluates rules +3. Sends new alerts +4. Exits + +### 2. Secrets Management +- API keys and tokens stored in environment variables only +- `.env` file for local development (gitignored) +- No secrets in YAML configuration + +### 3. Atomic State Persistence +State file writes use write-to-temp-then-rename pattern for crash safety: +```python +fd, temp_path = tempfile.mkstemp(...) +# Write to temp file +os.replace(temp_path, actual_path) # Atomic rename +``` + +### 4. Deduplication Strategy +Alerts are deduplicated using the key format: `{alert_type}:{forecast_hour}` + +This allows: +- Re-alerting for different forecast hours +- Preventing duplicate alerts for the same condition/hour +- Configurable deduplication window (default 24 hours) + +### 5. Typed Throughout +- Dataclasses for all data models +- Type hints on all functions +- Configuration loaded into typed dataclasses + +### 6. Centralized HTTP +Single `HttpClient` wrapper provides: +- Consistent timeouts +- Automatic retries with exponential backoff +- Structured logging +- Shared across services + +## Alert Rules + +### Temperature +- **Low temperature**: Alert when forecast drops below threshold (default: 32°F) +- **High temperature**: Alert when forecast exceeds threshold (default: 100°F) + +### Precipitation +- Alert when precipitation probability exceeds threshold (default: 70%) + +### Wind +- **Sustained wind**: Alert when speed exceeds threshold (default: 25 mph) +- **Wind gusts**: Alert when gusts exceed threshold (default: 40 mph) + +### Severe Weather +- Forwards any severe weather alerts from the VisualCrossing API + +## Execution Flow + +``` +1. Load configuration (YAML + environment) +2. Configure logging (structlog) +3. Initialize services +4. Fetch weather forecast +5. Evaluate alert rules +6. Filter duplicate alerts +7. Send new alerts via ntfy +8. Record sent alerts +9. Purge old records +10. Save state +11. Exit (0=success, 1=error) +``` + +## Dependencies + +| Package | Purpose | +|---------|---------| +| requests | HTTP client | +| structlog | Structured logging | +| python-dotenv | Environment variable loading | +| PyYAML | Configuration parsing | +| pytest | Testing | +| responses | HTTP mocking for tests | + +## Testing + +Run tests with: +```bash +pytest tests/ -v +``` + +Test coverage: +- Rule engine: Temperature, precipitation, wind, severe weather rules +- Weather service: API parsing, error handling +- State manager: Persistence, deduplication, purging + +## Usage + +### Setup +1. Copy `.env.example` to `.env` and fill in API keys +2. Copy `app/config/settings.example.yaml` to `app/config/settings.yaml` (optional customization) +3. Install dependencies: `pip install -r requirements.txt` + +### Running +```bash +python run.py +``` + +### Cron Integration +Add to crontab for hourly checks: +```cron +0 * * * * cd /path/to/weather_alerts && /path/to/python run.py >> /var/log/weather-alerts.log 2>&1 +``` + +## Future Enhancements + +Potential additions if needed: +- Additional alert types (UV index, humidity, etc.) +- Multiple location support +- Alert cooldown periods +- Notification channels beyond ntfy +- Web dashboard for status diff --git a/docs/security.md b/docs/security.md new file mode 100644 index 0000000..8bcbe03 --- /dev/null +++ b/docs/security.md @@ -0,0 +1,46 @@ +## Foundational Security Instructions + +- Act as a security-aware software engineer generating secure Python code. +- Produce implementations that are **secure-by-design and secure-by-default**, not merely cosmetically "secured." +- Focus on **preventing vulnerabilities**, not renaming functions or adding superficial security wrappers. +- Explicitly identify **trust boundaries** (user input, external systems, internal components) and apply stricter controls at all boundary crossings. +- Treat **all external input as untrusted by default**, regardless of source, and validate or sanitize it before use. +- Explicitly consider **data sensitivity** (e.g., public, internal, confidential, regulated) and enforce controls appropriate to the highest sensitivity level involved. +- Clearly distinguish between **authentication**, **authorization**, and **session management**, and never conflate their responsibilities. +- Ensure implementations **fail securely**: errors, exceptions, and edge cases MUST NOT expose sensitive data or weaken security guarantees. +- Use inline comments (when generating code) to clearly highlight critical security controls, assumptions, and security-relevant design decisions. +- Adhere strictly to OWASP best practices, with particular consideration for the OWASP ASVS. +- **Avoid slopsquatting and dependency confusion**: never guess package names or APIs; only reference well-known, reputable, and maintained libraries. Explicitly note any uncommon or low-reputation dependencies. +- Do not hardcode secrets, credentials, tokens, or cryptographic material. Always require secure external configuration or secret management mechanisms. + +--- + +## Common Weaknesses for Python + +### CWE-79: Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting') +**Summary:** Failure to properly sanitize or encode user input can lead to injection of malicious scripts into web pages, enabling XSS attacks. +**Mitigation Rule:** All user input rendered in web pages MUST be sanitized and contextually encoded using a secure library such as `bleach` or `html.escape`. + +### CWE-89: Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection') +**Summary:** Unsanitized user input in SQL queries can allow attackers to execute arbitrary SQL commands, compromising data integrity and confidentiality. +**Mitigation Rule:** SQL queries MUST use parameterized statements or prepared statements provided by libraries such as `sqlite3` or `SQLAlchemy`. Direct concatenation of user input into queries MUST NOT be used. + +### CWE-327: Use of a Broken or Risky Cryptographic Algorithm +**Summary:** Using outdated or insecure cryptographic algorithms can compromise data confidentiality and integrity. +**Mitigation Rule:** Cryptographic operations MUST use secure algorithms provided by the `cryptography` library. Deprecated algorithms such as MD5 or SHA-1 MUST NOT be used. + +### CWE-798: Use of Hard-coded Credentials +**Summary:** Hardcoding credentials in source code can lead to unauthorized access if the code is exposed or leaked. +**Mitigation Rule:** Secrets, credentials, and tokens MUST be stored securely using environment variables, secret management tools, or configuration files outside the source code repository. + +### CWE-200: Exposure of Sensitive Information to an Unauthorized Actor +**Summary:** Improper error handling or logging can expose sensitive data to unauthorized users. +**Mitigation Rule:** Error messages and logs MUST NOT include sensitive information such as stack traces, database connection strings, or user credentials. Use logging libraries such as `logging` with appropriate log levels and sanitization. + +### CWE-502: Deserialization of Untrusted Data +**Summary:** Deserializing untrusted data can lead to arbitrary code execution or data tampering. +**Mitigation Rule:** Deserialization MUST only be performed on trusted data sources. Unsafe libraries such as `pickle` MUST NOT be used for deserialization of untrusted input. + +### CWE-829: Inclusion of Functionality from Untrusted Control Sphere +**Summary:** Using dependencies or code from untrusted sources can introduce malicious functionality or vulnerabilities. +**Mitigation Rule:** Dependencies MUST be sourced from reputable package repositories such as PyPI. Verify the integrity and reputation of packages before use, and pin dependency versions to avoid supply chain attacks. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..12f3989 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +requests>=2.28.0,<3.0.0 +structlog>=23.0.0,<25.0.0 +python-dotenv>=1.0.0,<2.0.0 +PyYAML>=6.0,<7.0 +pytest>=7.0.0,<9.0.0 +responses>=0.23.0,<1.0.0 diff --git a/run.py b/run.py new file mode 100755 index 0000000..ea2c6f0 --- /dev/null +++ b/run.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""Convenience script to run the weather alerts application.""" + +import sys +from pathlib import Path + +# Add the project root to the Python path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +from app.main import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..44e6269 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Test suite for weather alerts.""" diff --git a/tests/test_rule_engine.py b/tests/test_rule_engine.py new file mode 100644 index 0000000..808ded1 --- /dev/null +++ b/tests/test_rule_engine.py @@ -0,0 +1,319 @@ +"""Tests for the rule engine.""" + +from datetime import datetime + +import pytest + +from app.models.alerts import ( + AlertRules, + AlertType, + PrecipitationRule, + SevereWeatherRule, + TemperatureRule, + WindRule, +) +from app.models.weather import HourlyForecast, WeatherAlert, WeatherForecast +from app.services.rule_engine import RuleEngine + + +def make_hourly_forecast( + temp: float = 70, + precip_prob: float = 0, + wind_speed: float = 5, + wind_gust: float = 10, + hour_offset: int = 0, +) -> HourlyForecast: + """Create a test HourlyForecast.""" + base_epoch = int(datetime.now().timestamp()) + (hour_offset * 3600) + return HourlyForecast( + datetime_str=datetime.fromtimestamp(base_epoch).strftime("%H:%M:%S"), + datetime_epoch=base_epoch, + temp=temp, + feelslike=temp, + humidity=50, + precip=0, + precip_prob=precip_prob, + snow=0, + snow_depth=0, + wind_speed=wind_speed, + wind_gust=wind_gust, + wind_dir=180, + pressure=30, + visibility=10, + cloud_cover=20, + uv_index=5, + conditions="Clear", + icon="clear-day", + ) + + +class TestTemperatureRules: + """Test temperature alert rules.""" + + def test_low_temperature_triggers_alert(self) -> None: + """Test that temperature below threshold triggers alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=True, below=32, above=100), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(temp=28, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.TEMPERATURE_LOW + assert triggered[0].value == 28 + assert triggered[0].threshold == 32 + + def test_high_temperature_triggers_alert(self) -> None: + """Test that temperature above threshold triggers alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=True, below=32, above=100), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(temp=105, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.TEMPERATURE_HIGH + assert triggered[0].value == 105 + + def test_normal_temperature_no_alert(self) -> None: + """Test that normal temperature doesn't trigger alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=True, below=32, above=100), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(temp=70, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 0 + + def test_disabled_rule_no_alert(self) -> None: + """Test that disabled rules don't trigger alerts.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False, below=32, above=100), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(temp=20, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 0 + + +class TestPrecipitationRules: + """Test precipitation alert rules.""" + + def test_high_precipitation_triggers_alert(self) -> None: + """Test that high precipitation probability triggers alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False), + precipitation=PrecipitationRule(enabled=True, probability_above=70), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(precip_prob=85, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.PRECIPITATION + assert triggered[0].value == 85 + + +class TestWindRules: + """Test wind alert rules.""" + + def test_high_wind_speed_triggers_alert(self) -> None: + """Test that high wind speed triggers alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=True, speed_above=25, gust_above=40), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(wind_speed=30, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.WIND_SPEED + + def test_high_wind_gust_triggers_alert(self) -> None: + """Test that high wind gust triggers alert.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=True, speed_above=25, gust_above=40), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[make_hourly_forecast(wind_gust=50, hour_offset=1)], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.WIND_GUST + + def test_both_wind_conditions_trigger_two_alerts(self) -> None: + """Test that high wind speed and gust trigger separate alerts.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=True, speed_above=25, gust_above=40), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[ + make_hourly_forecast(wind_speed=30, wind_gust=50, hour_offset=1) + ], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 2 + alert_types = {t.alert_type for t in triggered} + assert AlertType.WIND_SPEED in alert_types + assert AlertType.WIND_GUST in alert_types + + +class TestSevereWeatherRules: + """Test severe weather alert forwarding.""" + + def test_severe_weather_alert_forwarded(self) -> None: + """Test that severe weather alerts from API are forwarded.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=False), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=True), + ) + engine = RuleEngine(rules) + + api_alert = WeatherAlert( + event="Tornado Warning", + headline="Tornado Warning in effect", + description="A tornado has been spotted in the area.", + onset="2024-01-15T12:00:00", + ends="2024-01-15T14:00:00", + id="NWS-123", + language="en", + link="https://weather.gov/alerts", + ) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[], + alerts=[api_alert], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 1 + assert triggered[0].alert_type == AlertType.SEVERE_WEATHER + assert "Tornado Warning" in triggered[0].title + + +class TestMultipleHours: + """Test evaluation across multiple forecast hours.""" + + def test_multiple_hours_can_trigger_alerts(self) -> None: + """Test that alerts can trigger for different hours.""" + rules = AlertRules( + temperature=TemperatureRule(enabled=True, below=32, above=100), + precipitation=PrecipitationRule(enabled=False), + wind=WindRule(enabled=False), + severe_weather=SevereWeatherRule(enabled=False), + ) + engine = RuleEngine(rules) + + forecast = WeatherForecast( + location="test", + resolved_address="Test Location", + timezone="America/Chicago", + hourly_forecasts=[ + make_hourly_forecast(temp=28, hour_offset=1), + make_hourly_forecast(temp=70, hour_offset=2), + make_hourly_forecast(temp=25, hour_offset=3), + ], + alerts=[], + ) + + triggered = engine.evaluate(forecast) + + assert len(triggered) == 2 + assert all(t.alert_type == AlertType.TEMPERATURE_LOW for t in triggered) diff --git a/tests/test_state_manager.py b/tests/test_state_manager.py new file mode 100644 index 0000000..eee8184 --- /dev/null +++ b/tests/test_state_manager.py @@ -0,0 +1,175 @@ +"""Tests for the state manager.""" + +import json +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +import pytest + +from app.models.alerts import AlertType, TriggeredAlert +from app.services.state_manager import StateManager + + +def make_triggered_alert( + alert_type: AlertType = AlertType.TEMPERATURE_LOW, + forecast_hour: str = "2024-01-15-12", +) -> TriggeredAlert: + """Create a test triggered alert.""" + return TriggeredAlert( + alert_type=alert_type, + title="Test Alert", + message="Test message", + forecast_hour=forecast_hour, + value=25.0, + threshold=32.0, + ) + + +class TestStateManager: + """Test state manager functionality.""" + + def test_load_creates_empty_state_if_no_file(self, tmp_path: Path) -> None: + """Test that loading from nonexistent file creates empty state.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file)) + + state = manager.load() + + assert len(state.sent_alerts) == 0 + + def test_save_creates_file(self, tmp_path: Path) -> None: + """Test that saving creates the state file.""" + state_file = tmp_path / "subdir" / "state.json" + manager = StateManager(str(state_file)) + + alert = make_triggered_alert() + manager.record_sent(alert) + manager.save() + + assert state_file.exists() + + def test_save_and_load_round_trip(self, tmp_path: Path) -> None: + """Test that state survives save/load cycle.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file)) + + alert = make_triggered_alert() + manager.record_sent(alert) + manager.save() + + # Create new manager and load + manager2 = StateManager(str(state_file)) + state = manager2.load() + + assert len(state.sent_alerts) == 1 + assert alert.dedup_key in state.sent_alerts + + def test_filter_duplicates_removes_known_alerts(self, tmp_path: Path) -> None: + """Test that known alerts are filtered out.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file)) + + # Record an alert as sent + alert1 = make_triggered_alert(forecast_hour="2024-01-15-12") + manager.record_sent(alert1) + + # Try to send the same alert again plus a new one + alert2 = make_triggered_alert(forecast_hour="2024-01-15-12") # Duplicate + alert3 = make_triggered_alert(forecast_hour="2024-01-15-13") # New + + filtered = manager.filter_duplicates([alert2, alert3]) + + assert len(filtered) == 1 + assert filtered[0].forecast_hour == "2024-01-15-13" + + def test_different_alert_types_not_duplicates(self, tmp_path: Path) -> None: + """Test that different alert types for same hour aren't duplicates.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file)) + + alert1 = make_triggered_alert( + alert_type=AlertType.TEMPERATURE_LOW, forecast_hour="2024-01-15-12" + ) + manager.record_sent(alert1) + + alert2 = make_triggered_alert( + alert_type=AlertType.PRECIPITATION, forecast_hour="2024-01-15-12" + ) + + filtered = manager.filter_duplicates([alert2]) + + assert len(filtered) == 1 + + def test_purge_old_records(self, tmp_path: Path) -> None: + """Test that old records are purged.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file), dedup_window_hours=24) + + # Manually create an old record + alert = make_triggered_alert() + manager.state.sent_alerts[alert.dedup_key] = manager.state.sent_alerts.get( + alert.dedup_key + ) or type( + "SentAlertRecord", + (), + { + "dedup_key": alert.dedup_key, + "alert_type": alert.alert_type.value, + "sent_at": datetime.now() - timedelta(hours=48), + "forecast_hour": alert.forecast_hour, + "to_dict": lambda self: { + "dedup_key": self.dedup_key, + "alert_type": self.alert_type, + "sent_at": self.sent_at.isoformat(), + "forecast_hour": self.forecast_hour, + }, + }, + )() + + # Use the actual model + from app.models.state import SentAlertRecord + + manager.state.sent_alerts[alert.dedup_key] = SentAlertRecord( + dedup_key=alert.dedup_key, + alert_type=alert.alert_type.value, + sent_at=datetime.now() - timedelta(hours=48), + forecast_hour=alert.forecast_hour, + ) + + purged = manager.purge_old_records() + + assert purged == 1 + assert len(manager.state.sent_alerts) == 0 + + def test_atomic_write_creates_temp_file(self, tmp_path: Path) -> None: + """Test that atomic write doesn't leave temp files on success.""" + state_file = tmp_path / "state.json" + manager = StateManager(str(state_file)) + + alert = make_triggered_alert() + manager.record_sent(alert) + manager.save() + + # Check no temp files remain + temp_files = list(tmp_path.glob("state_*.tmp")) + assert len(temp_files) == 0 + + def test_load_handles_corrupt_json(self, tmp_path: Path) -> None: + """Test that corrupt JSON is handled gracefully.""" + state_file = tmp_path / "state.json" + state_file.write_text("not valid json {{{") + + manager = StateManager(str(state_file)) + state = manager.load() + + # Should return empty state + assert len(state.sent_alerts) == 0 + + def test_dedup_key_format(self) -> None: + """Test that dedup key has expected format.""" + alert = make_triggered_alert( + alert_type=AlertType.TEMPERATURE_LOW, forecast_hour="2024-01-15-12" + ) + + assert alert.dedup_key == "temperature_low:2024-01-15-12" diff --git a/tests/test_weather_service.py b/tests/test_weather_service.py new file mode 100644 index 0000000..527f1c3 --- /dev/null +++ b/tests/test_weather_service.py @@ -0,0 +1,175 @@ +"""Tests for the weather service.""" + +import pytest +import responses + +from app.services.weather_service import WeatherService, WeatherServiceError +from app.utils.http_client import HttpClient + + +SAMPLE_API_RESPONSE = { + "address": "viola,tn", + "resolvedAddress": "Viola, TN, United States", + "timezone": "America/Chicago", + "days": [ + { + "datetime": "2024-01-15", + "hours": [ + { + "datetime": "12:00:00", + "datetimeEpoch": 2000000000, # Future timestamp + "temp": 45.0, + "feelslike": 42.0, + "humidity": 65.0, + "precip": 0.0, + "precipprob": 20.0, + "snow": 0.0, + "snowdepth": 0.0, + "windspeed": 10.0, + "windgust": 15.0, + "winddir": 180.0, + "pressure": 30.1, + "visibility": 10.0, + "cloudcover": 50.0, + "uvindex": 3, + "conditions": "Partially cloudy", + "icon": "partly-cloudy-day", + }, + { + "datetime": "13:00:00", + "datetimeEpoch": 2000003600, + "temp": 48.0, + "feelslike": 45.0, + "humidity": 60.0, + "precip": 0.0, + "precipprob": 15.0, + "snow": 0.0, + "snowdepth": 0.0, + "windspeed": 12.0, + "windgust": 18.0, + "winddir": 185.0, + "pressure": 30.0, + "visibility": 10.0, + "cloudcover": 45.0, + "uvindex": 4, + "conditions": "Partially cloudy", + "icon": "partly-cloudy-day", + }, + ], + } + ], + "alerts": [ + { + "event": "Wind Advisory", + "headline": "Wind Advisory in effect until 6 PM", + "description": "Gusty winds expected throughout the day.", + "onset": "2024-01-15T08:00:00", + "ends": "2024-01-15T18:00:00", + "id": "NWS-456", + "language": "en", + "link": "https://weather.gov/alerts/456", + } + ], +} + + +class TestWeatherService: + """Test weather service functionality.""" + + def test_requires_api_key(self) -> None: + """Test that service requires an API key.""" + with pytest.raises(WeatherServiceError, match="API key is required"): + WeatherService(api_key="") + + @responses.activate + def test_get_forecast_success(self) -> None: + """Test successful forecast retrieval.""" + responses.add( + responses.GET, + "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/viola%2Ctn", + json=SAMPLE_API_RESPONSE, + status=200, + ) + + service = WeatherService(api_key="test-key") + forecast = service.get_forecast("viola,tn", hours_ahead=24) + + assert forecast.location == "viola,tn" + assert forecast.resolved_address == "Viola, TN, United States" + assert len(forecast.hourly_forecasts) == 2 + assert len(forecast.alerts) == 1 + assert forecast.alerts[0].event == "Wind Advisory" + + @responses.activate + def test_get_forecast_parses_hourly_data(self) -> None: + """Test that hourly forecast data is correctly parsed.""" + responses.add( + responses.GET, + "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/viola%2Ctn", + json=SAMPLE_API_RESPONSE, + status=200, + ) + + service = WeatherService(api_key="test-key") + forecast = service.get_forecast("viola,tn") + + hour = forecast.hourly_forecasts[0] + assert hour.temp == 45.0 + assert hour.feelslike == 42.0 + assert hour.humidity == 65.0 + assert hour.precip_prob == 20.0 + assert hour.wind_speed == 10.0 + assert hour.wind_gust == 15.0 + assert hour.conditions == "Partially cloudy" + + @responses.activate + def test_get_forecast_handles_api_error(self) -> None: + """Test handling of API errors.""" + responses.add( + responses.GET, + "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/viola%2Ctn", + json={"error": "Invalid API key"}, + status=401, + ) + + service = WeatherService(api_key="bad-key") + + with pytest.raises(WeatherServiceError, match="Failed to fetch forecast"): + service.get_forecast("viola,tn") + + @responses.activate + def test_get_forecast_handles_invalid_json(self) -> None: + """Test handling of invalid JSON response.""" + responses.add( + responses.GET, + "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/viola%2Ctn", + body="not json", + status=200, + content_type="text/plain", + ) + + service = WeatherService(api_key="test-key") + + with pytest.raises(WeatherServiceError, match="Invalid JSON"): + service.get_forecast("viola,tn") + + @responses.activate + def test_location_encoding(self) -> None: + """Test that location is properly URL encoded.""" + responses.add( + responses.GET, + "https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/Nashville%2C%20TN", + json={ + "address": "Nashville, TN", + "resolvedAddress": "Nashville, TN, United States", + "timezone": "America/Chicago", + "days": [], + "alerts": [], + }, + status=200, + ) + + service = WeatherService(api_key="test-key") + forecast = service.get_forecast("Nashville, TN") + + assert forecast.resolved_address == "Nashville, TN, United States"