349 lines
12 KiB
Python
349 lines
12 KiB
Python
"""Main orchestration module for weather alerts application."""
|
|
|
|
import sys
|
|
from typing import Optional
|
|
|
|
from app.config.loader import AppConfig, load_config
|
|
from app.models.ai_summary import ForecastContext
|
|
from app.models.alerts import AggregatedAlert
|
|
from app.models.weather import WeatherForecast
|
|
from app.services.ai_summary_service import AISummaryService, AISummaryServiceError
|
|
from app.services.alert_aggregator import AlertAggregator
|
|
from app.services.change_detector import ChangeDetector, ChangeReport
|
|
from app.services.notification_service import NotificationService
|
|
from app.services.rule_engine import RuleEngine
|
|
from app.services.state_manager import StateManager
|
|
from app.services.weather_service import WeatherService, WeatherServiceError
|
|
from app.utils.http_client import HttpClient
|
|
from app.utils.logging_config import configure_logging, get_logger
|
|
|
|
|
|
class WeatherAlertsApp:
|
|
"""Main application class for weather alerts."""
|
|
|
|
def __init__(self, config: AppConfig) -> None:
|
|
"""Initialize the application with configuration.
|
|
|
|
Args:
|
|
config: The application configuration.
|
|
"""
|
|
self.config = config
|
|
self.logger = get_logger(__name__)
|
|
|
|
# Initialize HTTP client (shared across services)
|
|
self.http_client = HttpClient()
|
|
|
|
# Initialize services
|
|
self.weather_service = WeatherService(
|
|
api_key=config.weather.api_key,
|
|
http_client=self.http_client,
|
|
)
|
|
|
|
self.rule_engine = RuleEngine(rules=config.alerts.rules)
|
|
|
|
self.state_manager = StateManager(
|
|
file_path=config.state.file_path,
|
|
dedup_window_hours=config.state.dedup_window_hours,
|
|
)
|
|
|
|
self.alert_aggregator = AlertAggregator()
|
|
|
|
self.notification_service = NotificationService(
|
|
server_url=config.notifications.ntfy.server_url,
|
|
topic=config.notifications.ntfy.topic,
|
|
access_token=config.notifications.ntfy.access_token,
|
|
priority=config.notifications.ntfy.priority,
|
|
default_tags=config.notifications.ntfy.tags,
|
|
http_client=self.http_client,
|
|
)
|
|
|
|
# Initialize AI services conditionally
|
|
self.ai_enabled = config.ai.enabled and bool(config.ai.api_token)
|
|
self.ai_summary_service: Optional[AISummaryService] = None
|
|
self.change_detector: Optional[ChangeDetector] = None
|
|
|
|
if self.ai_enabled:
|
|
self.ai_summary_service = AISummaryService(
|
|
api_token=config.ai.api_token,
|
|
model=config.ai.model,
|
|
api_timeout=config.ai.api_timeout,
|
|
max_tokens=config.ai.max_tokens,
|
|
http_client=self.http_client,
|
|
)
|
|
self.logger.info("ai_summary_enabled", model=config.ai.model)
|
|
|
|
if config.change_detection.enabled:
|
|
self.change_detector = ChangeDetector(
|
|
thresholds=config.change_detection.thresholds
|
|
)
|
|
|
|
def run(self) -> int:
|
|
"""Execute the main application flow.
|
|
|
|
Returns:
|
|
Exit code (0 for success, 1 for error).
|
|
"""
|
|
self.logger.info(
|
|
"app_starting",
|
|
version=self.config.app.version,
|
|
location=self.config.weather.location,
|
|
ai_enabled=self.ai_enabled,
|
|
)
|
|
|
|
try:
|
|
# Step 1: Fetch weather forecast
|
|
self.logger.info("step_fetch_forecast")
|
|
forecast = self.weather_service.get_forecast(
|
|
location=self.config.weather.location,
|
|
hours_ahead=self.config.weather.hours_ahead,
|
|
unit_group=self.config.weather.unit_group,
|
|
)
|
|
|
|
# Step 2: Evaluate rules against forecast
|
|
self.logger.info("step_evaluate_rules")
|
|
triggered_alerts = self.rule_engine.evaluate(forecast)
|
|
|
|
if not triggered_alerts:
|
|
self.logger.info("no_alerts_triggered")
|
|
# Clear snapshots when no alerts
|
|
self.state_manager.save_alert_snapshots([])
|
|
self.state_manager.save()
|
|
return 0
|
|
|
|
self.logger.info(
|
|
"alerts_triggered",
|
|
count=len(triggered_alerts),
|
|
)
|
|
|
|
# Step 2.5: Aggregate alerts by type
|
|
self.logger.info("step_aggregate_alerts")
|
|
aggregated_alerts = self.alert_aggregator.aggregate(triggered_alerts)
|
|
|
|
self.logger.info(
|
|
"alerts_aggregated",
|
|
input_count=len(triggered_alerts),
|
|
output_count=len(aggregated_alerts),
|
|
)
|
|
|
|
# Branch based on AI enabled
|
|
if self.ai_enabled:
|
|
return self._run_ai_flow(forecast, aggregated_alerts)
|
|
else:
|
|
return self._run_standard_flow(aggregated_alerts)
|
|
|
|
except WeatherServiceError as e:
|
|
self.logger.error("weather_service_error", error=str(e))
|
|
return 1
|
|
|
|
except Exception as e:
|
|
self.logger.exception("unexpected_error", error=str(e))
|
|
return 1
|
|
|
|
finally:
|
|
self.http_client.close()
|
|
|
|
def _run_standard_flow(self, aggregated_alerts: list[AggregatedAlert]) -> int:
|
|
"""Run the standard notification flow without AI.
|
|
|
|
Args:
|
|
aggregated_alerts: List of aggregated alerts.
|
|
|
|
Returns:
|
|
Exit code (0 for success, 1 for error).
|
|
"""
|
|
# Filter duplicates
|
|
self.logger.info("step_filter_duplicates")
|
|
new_alerts = self.state_manager.filter_duplicates(aggregated_alerts)
|
|
|
|
if not new_alerts:
|
|
self.logger.info("all_alerts_are_duplicates")
|
|
self._finalize(aggregated_alerts)
|
|
return 0
|
|
|
|
# Send notifications
|
|
self.logger.info(
|
|
"step_send_notifications",
|
|
count=len(new_alerts),
|
|
)
|
|
results = self.notification_service.send_batch(new_alerts)
|
|
|
|
# Record sent alerts
|
|
self.logger.info("step_record_sent")
|
|
for result in results:
|
|
if result.success:
|
|
self.state_manager.record_sent(result.alert)
|
|
|
|
self._finalize(aggregated_alerts)
|
|
|
|
# Report results
|
|
success_count = sum(1 for r in results if r.success)
|
|
failed_count = len(results) - success_count
|
|
|
|
self.logger.info(
|
|
"app_complete",
|
|
alerts_sent=success_count,
|
|
alerts_failed=failed_count,
|
|
)
|
|
|
|
return 0 if failed_count == 0 else 1
|
|
|
|
def _run_ai_flow(
|
|
self,
|
|
forecast: WeatherForecast,
|
|
aggregated_alerts: list[AggregatedAlert],
|
|
) -> int:
|
|
"""Run the AI summarization flow.
|
|
|
|
Args:
|
|
forecast: The weather forecast data.
|
|
aggregated_alerts: List of aggregated alerts.
|
|
|
|
Returns:
|
|
Exit code (0 for success, 1 for error).
|
|
"""
|
|
self.logger.info("step_ai_summary_flow")
|
|
|
|
# Build forecast context
|
|
forecast_context = self._build_forecast_context(forecast)
|
|
|
|
# Detect changes from previous run
|
|
change_report = ChangeReport()
|
|
if self.change_detector:
|
|
previous_snapshots = self.state_manager.get_previous_snapshots()
|
|
change_report = self.change_detector.detect(
|
|
aggregated_alerts, previous_snapshots
|
|
)
|
|
|
|
# Try to generate AI summary
|
|
try:
|
|
assert self.ai_summary_service is not None
|
|
summary = self.ai_summary_service.summarize(
|
|
alerts=aggregated_alerts,
|
|
change_report=change_report,
|
|
forecast_context=forecast_context,
|
|
location=self.config.weather.location,
|
|
)
|
|
|
|
# Send summary notification
|
|
self.logger.info("step_send_ai_summary")
|
|
result = self.notification_service.send_summary(summary)
|
|
|
|
if result.success:
|
|
self.state_manager.record_ai_summary_sent()
|
|
self._finalize(aggregated_alerts)
|
|
self.logger.info(
|
|
"app_complete_ai",
|
|
summary_sent=True,
|
|
alert_count=len(aggregated_alerts),
|
|
)
|
|
return 0
|
|
else:
|
|
self.logger.warning(
|
|
"ai_summary_send_failed",
|
|
error=result.error,
|
|
fallback="individual_alerts",
|
|
)
|
|
return self._run_standard_flow(aggregated_alerts)
|
|
|
|
except AISummaryServiceError as e:
|
|
self.logger.warning(
|
|
"ai_summary_generation_failed",
|
|
error=str(e),
|
|
fallback="individual_alerts",
|
|
)
|
|
return self._run_standard_flow(aggregated_alerts)
|
|
|
|
def _build_forecast_context(self, forecast: WeatherForecast) -> ForecastContext:
|
|
"""Build forecast context from weather forecast data.
|
|
|
|
Args:
|
|
forecast: The weather forecast data.
|
|
|
|
Returns:
|
|
ForecastContext for AI summary.
|
|
"""
|
|
if not forecast.hourly_forecasts:
|
|
return ForecastContext(
|
|
start_time="N/A",
|
|
end_time="N/A",
|
|
min_temp=0,
|
|
max_temp=0,
|
|
max_wind_speed=0,
|
|
max_wind_gust=0,
|
|
max_precip_prob=0,
|
|
conditions=[],
|
|
)
|
|
|
|
temps = [h.temp for h in forecast.hourly_forecasts]
|
|
wind_speeds = [h.wind_speed for h in forecast.hourly_forecasts]
|
|
wind_gusts = [h.wind_gust for h in forecast.hourly_forecasts]
|
|
precip_probs = [h.precip_prob for h in forecast.hourly_forecasts]
|
|
conditions = [h.conditions for h in forecast.hourly_forecasts]
|
|
|
|
return ForecastContext(
|
|
start_time=forecast.hourly_forecasts[0].hour_key,
|
|
end_time=forecast.hourly_forecasts[-1].hour_key,
|
|
min_temp=min(temps),
|
|
max_temp=max(temps),
|
|
max_wind_speed=max(wind_speeds),
|
|
max_wind_gust=max(wind_gusts),
|
|
max_precip_prob=max(precip_probs),
|
|
conditions=conditions,
|
|
)
|
|
|
|
def _finalize(self, aggregated_alerts: list[AggregatedAlert]) -> None:
|
|
"""Finalize the run by saving state.
|
|
|
|
Args:
|
|
aggregated_alerts: Current alerts to save as snapshots.
|
|
"""
|
|
self.state_manager.save_alert_snapshots(aggregated_alerts)
|
|
self.state_manager.purge_old_records()
|
|
self.state_manager.save()
|
|
|
|
|
|
def main(config_path: Optional[str] = None) -> int:
|
|
"""Main entry point for the application.
|
|
|
|
Args:
|
|
config_path: Optional path to configuration file.
|
|
|
|
Returns:
|
|
Exit code (0 for success, 1 for error).
|
|
"""
|
|
# Load configuration
|
|
try:
|
|
config = load_config(config_path)
|
|
except Exception as e:
|
|
print(f"Failed to load configuration: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
# Configure logging
|
|
configure_logging(config.app.log_level)
|
|
logger = get_logger(__name__)
|
|
|
|
# Validate required secrets
|
|
if not config.weather.api_key:
|
|
logger.error("missing_api_key", hint="Set VISUALCROSSING_API_KEY environment variable")
|
|
return 1
|
|
|
|
if not config.notifications.ntfy.access_token:
|
|
logger.warning(
|
|
"missing_ntfy_token",
|
|
hint="Set NTFY_ACCESS_TOKEN if your server requires auth",
|
|
)
|
|
|
|
if config.ai.enabled and not config.ai.api_token:
|
|
logger.warning(
|
|
"ai_enabled_but_no_token",
|
|
hint="Set REPLICATE_API_TOKEN to enable AI summaries",
|
|
)
|
|
|
|
# Run the application
|
|
app = WeatherAlertsApp(config)
|
|
return app.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|