"""Main orchestration module for weather alerts application.""" import sys from typing import Optional from app.config.loader import AppConfig, load_config from app.models.ai_summary import ForecastContext from app.models.alerts import AggregatedAlert from app.models.weather import WeatherForecast from app.services.ai_summary_service import AISummaryService, AISummaryServiceError from app.services.alert_aggregator import AlertAggregator from app.services.change_detector import ChangeDetector, ChangeReport from app.services.notification_service import NotificationService from app.services.rule_engine import RuleEngine from app.services.state_manager import StateManager from app.services.weather_service import WeatherService, WeatherServiceError from app.utils.http_client import HttpClient from app.utils.logging_config import configure_logging, get_logger class WeatherAlertsApp: """Main application class for weather alerts.""" def __init__(self, config: AppConfig) -> None: """Initialize the application with configuration. Args: config: The application configuration. """ self.config = config self.logger = get_logger(__name__) # Initialize HTTP client (shared across services) self.http_client = HttpClient() # Initialize services self.weather_service = WeatherService( api_key=config.weather.api_key, http_client=self.http_client, ) self.rule_engine = RuleEngine(rules=config.alerts.rules) self.state_manager = StateManager( file_path=config.state.file_path, dedup_window_hours=config.state.dedup_window_hours, ) self.alert_aggregator = AlertAggregator() self.notification_service = NotificationService( server_url=config.notifications.ntfy.server_url, topic=config.notifications.ntfy.topic, access_token=config.notifications.ntfy.access_token, priority=config.notifications.ntfy.priority, default_tags=config.notifications.ntfy.tags, http_client=self.http_client, ) # Initialize AI services conditionally self.ai_enabled = config.ai.enabled and bool(config.ai.api_token) self.ai_summary_service: Optional[AISummaryService] = None self.change_detector: Optional[ChangeDetector] = None if self.ai_enabled: self.ai_summary_service = AISummaryService( api_token=config.ai.api_token, model=config.ai.model, api_timeout=config.ai.api_timeout, max_tokens=config.ai.max_tokens, http_client=self.http_client, ) self.logger.info("ai_summary_enabled", model=config.ai.model) if config.change_detection.enabled: self.change_detector = ChangeDetector( thresholds=config.change_detection.thresholds ) def run(self) -> int: """Execute the main application flow. Returns: Exit code (0 for success, 1 for error). """ self.logger.info( "app_starting", version=self.config.app.version, location=self.config.weather.location, ai_enabled=self.ai_enabled, ) try: # Step 1: Fetch weather forecast self.logger.info("step_fetch_forecast") forecast = self.weather_service.get_forecast( location=self.config.weather.location, hours_ahead=self.config.weather.hours_ahead, unit_group=self.config.weather.unit_group, ) # Step 2: Evaluate rules against forecast self.logger.info("step_evaluate_rules") triggered_alerts = self.rule_engine.evaluate(forecast) if not triggered_alerts: self.logger.info("no_alerts_triggered") # Clear snapshots when no alerts self.state_manager.save_alert_snapshots([]) self.state_manager.save() return 0 self.logger.info( "alerts_triggered", count=len(triggered_alerts), ) # Step 2.5: Aggregate alerts by type self.logger.info("step_aggregate_alerts") aggregated_alerts = self.alert_aggregator.aggregate(triggered_alerts) self.logger.info( "alerts_aggregated", input_count=len(triggered_alerts), output_count=len(aggregated_alerts), ) # Branch based on AI enabled if self.ai_enabled: return self._run_ai_flow(forecast, aggregated_alerts) else: return self._run_standard_flow(aggregated_alerts) except WeatherServiceError as e: self.logger.error("weather_service_error", error=str(e)) return 1 except Exception as e: self.logger.exception("unexpected_error", error=str(e)) return 1 finally: self.http_client.close() def _run_standard_flow(self, aggregated_alerts: list[AggregatedAlert]) -> int: """Run the standard notification flow without AI. Args: aggregated_alerts: List of aggregated alerts. Returns: Exit code (0 for success, 1 for error). """ # Filter duplicates self.logger.info("step_filter_duplicates") new_alerts = self.state_manager.filter_duplicates(aggregated_alerts) if not new_alerts: self.logger.info("all_alerts_are_duplicates") self._finalize(aggregated_alerts) return 0 # Send notifications self.logger.info( "step_send_notifications", count=len(new_alerts), ) results = self.notification_service.send_batch(new_alerts) # Record sent alerts self.logger.info("step_record_sent") for result in results: if result.success: self.state_manager.record_sent(result.alert) self._finalize(aggregated_alerts) # Report results success_count = sum(1 for r in results if r.success) failed_count = len(results) - success_count self.logger.info( "app_complete", alerts_sent=success_count, alerts_failed=failed_count, ) return 0 if failed_count == 0 else 1 def _run_ai_flow( self, forecast: WeatherForecast, aggregated_alerts: list[AggregatedAlert], ) -> int: """Run the AI summarization flow. Args: forecast: The weather forecast data. aggregated_alerts: List of aggregated alerts. Returns: Exit code (0 for success, 1 for error). """ self.logger.info("step_ai_summary_flow") # Build forecast context forecast_context = self._build_forecast_context(forecast) # Detect changes from previous run change_report = ChangeReport() if self.change_detector: previous_snapshots = self.state_manager.get_previous_snapshots() change_report = self.change_detector.detect( aggregated_alerts, previous_snapshots ) # Try to generate AI summary try: assert self.ai_summary_service is not None summary = self.ai_summary_service.summarize( alerts=aggregated_alerts, change_report=change_report, forecast_context=forecast_context, location=self.config.weather.location, ) # Send summary notification self.logger.info("step_send_ai_summary") result = self.notification_service.send_summary(summary) if result.success: self.state_manager.record_ai_summary_sent() self._finalize(aggregated_alerts) self.logger.info( "app_complete_ai", summary_sent=True, alert_count=len(aggregated_alerts), ) return 0 else: self.logger.warning( "ai_summary_send_failed", error=result.error, fallback="individual_alerts", ) return self._run_standard_flow(aggregated_alerts) except AISummaryServiceError as e: self.logger.warning( "ai_summary_generation_failed", error=str(e), fallback="individual_alerts", ) return self._run_standard_flow(aggregated_alerts) def _build_forecast_context(self, forecast: WeatherForecast) -> ForecastContext: """Build forecast context from weather forecast data. Args: forecast: The weather forecast data. Returns: ForecastContext for AI summary. """ if not forecast.hourly_forecasts: return ForecastContext( start_time="N/A", end_time="N/A", min_temp=0, max_temp=0, max_wind_speed=0, max_wind_gust=0, max_precip_prob=0, conditions=[], ) temps = [h.temp for h in forecast.hourly_forecasts] wind_speeds = [h.wind_speed for h in forecast.hourly_forecasts] wind_gusts = [h.wind_gust for h in forecast.hourly_forecasts] precip_probs = [h.precip_prob for h in forecast.hourly_forecasts] conditions = [h.conditions for h in forecast.hourly_forecasts] return ForecastContext( start_time=forecast.hourly_forecasts[0].hour_key, end_time=forecast.hourly_forecasts[-1].hour_key, min_temp=min(temps), max_temp=max(temps), max_wind_speed=max(wind_speeds), max_wind_gust=max(wind_gusts), max_precip_prob=max(precip_probs), conditions=conditions, ) def _finalize(self, aggregated_alerts: list[AggregatedAlert]) -> None: """Finalize the run by saving state. Args: aggregated_alerts: Current alerts to save as snapshots. """ self.state_manager.save_alert_snapshots(aggregated_alerts) self.state_manager.purge_old_records() self.state_manager.save() def main(config_path: Optional[str] = None) -> int: """Main entry point for the application. Args: config_path: Optional path to configuration file. Returns: Exit code (0 for success, 1 for error). """ # Load configuration try: config = load_config(config_path) except Exception as e: print(f"Failed to load configuration: {e}", file=sys.stderr) return 1 # Configure logging configure_logging(config.app.log_level) logger = get_logger(__name__) # Validate required secrets if not config.weather.api_key: logger.error("missing_api_key", hint="Set VISUALCROSSING_API_KEY environment variable") return 1 if not config.notifications.ntfy.access_token: logger.warning( "missing_ntfy_token", hint="Set NTFY_ACCESS_TOKEN if your server requires auth", ) if config.ai.enabled and not config.ai.api_token: logger.warning( "ai_enabled_but_no_token", hint="Set REPLICATE_API_TOKEN to enable AI summaries", ) # Run the application app = WeatherAlertsApp(config) return app.run() if __name__ == "__main__": sys.exit(main())