176 lines
5.8 KiB
Python
176 lines
5.8 KiB
Python
"""Tests for the state manager."""
|
|
|
|
import json
|
|
import tempfile
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from app.models.alerts import AlertType, TriggeredAlert
|
|
from app.services.state_manager import StateManager
|
|
|
|
|
|
def make_triggered_alert(
|
|
alert_type: AlertType = AlertType.TEMPERATURE_LOW,
|
|
forecast_hour: str = "2024-01-15-12",
|
|
) -> TriggeredAlert:
|
|
"""Create a test triggered alert."""
|
|
return TriggeredAlert(
|
|
alert_type=alert_type,
|
|
title="Test Alert",
|
|
message="Test message",
|
|
forecast_hour=forecast_hour,
|
|
value=25.0,
|
|
threshold=32.0,
|
|
)
|
|
|
|
|
|
class TestStateManager:
|
|
"""Test state manager functionality."""
|
|
|
|
def test_load_creates_empty_state_if_no_file(self, tmp_path: Path) -> None:
|
|
"""Test that loading from nonexistent file creates empty state."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
state = manager.load()
|
|
|
|
assert len(state.sent_alerts) == 0
|
|
|
|
def test_save_creates_file(self, tmp_path: Path) -> None:
|
|
"""Test that saving creates the state file."""
|
|
state_file = tmp_path / "subdir" / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
alert = make_triggered_alert()
|
|
manager.record_sent(alert)
|
|
manager.save()
|
|
|
|
assert state_file.exists()
|
|
|
|
def test_save_and_load_round_trip(self, tmp_path: Path) -> None:
|
|
"""Test that state survives save/load cycle."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
alert = make_triggered_alert()
|
|
manager.record_sent(alert)
|
|
manager.save()
|
|
|
|
# Create new manager and load
|
|
manager2 = StateManager(str(state_file))
|
|
state = manager2.load()
|
|
|
|
assert len(state.sent_alerts) == 1
|
|
assert alert.dedup_key in state.sent_alerts
|
|
|
|
def test_filter_duplicates_removes_known_alerts(self, tmp_path: Path) -> None:
|
|
"""Test that known alerts are filtered out."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
# Record an alert as sent
|
|
alert1 = make_triggered_alert(forecast_hour="2024-01-15-12")
|
|
manager.record_sent(alert1)
|
|
|
|
# Try to send the same alert again plus a new one
|
|
alert2 = make_triggered_alert(forecast_hour="2024-01-15-12") # Duplicate
|
|
alert3 = make_triggered_alert(forecast_hour="2024-01-15-13") # New
|
|
|
|
filtered = manager.filter_duplicates([alert2, alert3])
|
|
|
|
assert len(filtered) == 1
|
|
assert filtered[0].forecast_hour == "2024-01-15-13"
|
|
|
|
def test_different_alert_types_not_duplicates(self, tmp_path: Path) -> None:
|
|
"""Test that different alert types for same hour aren't duplicates."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
alert1 = make_triggered_alert(
|
|
alert_type=AlertType.TEMPERATURE_LOW, forecast_hour="2024-01-15-12"
|
|
)
|
|
manager.record_sent(alert1)
|
|
|
|
alert2 = make_triggered_alert(
|
|
alert_type=AlertType.PRECIPITATION, forecast_hour="2024-01-15-12"
|
|
)
|
|
|
|
filtered = manager.filter_duplicates([alert2])
|
|
|
|
assert len(filtered) == 1
|
|
|
|
def test_purge_old_records(self, tmp_path: Path) -> None:
|
|
"""Test that old records are purged."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file), dedup_window_hours=24)
|
|
|
|
# Manually create an old record
|
|
alert = make_triggered_alert()
|
|
manager.state.sent_alerts[alert.dedup_key] = manager.state.sent_alerts.get(
|
|
alert.dedup_key
|
|
) or type(
|
|
"SentAlertRecord",
|
|
(),
|
|
{
|
|
"dedup_key": alert.dedup_key,
|
|
"alert_type": alert.alert_type.value,
|
|
"sent_at": datetime.now() - timedelta(hours=48),
|
|
"forecast_hour": alert.forecast_hour,
|
|
"to_dict": lambda self: {
|
|
"dedup_key": self.dedup_key,
|
|
"alert_type": self.alert_type,
|
|
"sent_at": self.sent_at.isoformat(),
|
|
"forecast_hour": self.forecast_hour,
|
|
},
|
|
},
|
|
)()
|
|
|
|
# Use the actual model
|
|
from app.models.state import SentAlertRecord
|
|
|
|
manager.state.sent_alerts[alert.dedup_key] = SentAlertRecord(
|
|
dedup_key=alert.dedup_key,
|
|
alert_type=alert.alert_type.value,
|
|
sent_at=datetime.now() - timedelta(hours=48),
|
|
forecast_hour=alert.forecast_hour,
|
|
)
|
|
|
|
purged = manager.purge_old_records()
|
|
|
|
assert purged == 1
|
|
assert len(manager.state.sent_alerts) == 0
|
|
|
|
def test_atomic_write_creates_temp_file(self, tmp_path: Path) -> None:
|
|
"""Test that atomic write doesn't leave temp files on success."""
|
|
state_file = tmp_path / "state.json"
|
|
manager = StateManager(str(state_file))
|
|
|
|
alert = make_triggered_alert()
|
|
manager.record_sent(alert)
|
|
manager.save()
|
|
|
|
# Check no temp files remain
|
|
temp_files = list(tmp_path.glob("state_*.tmp"))
|
|
assert len(temp_files) == 0
|
|
|
|
def test_load_handles_corrupt_json(self, tmp_path: Path) -> None:
|
|
"""Test that corrupt JSON is handled gracefully."""
|
|
state_file = tmp_path / "state.json"
|
|
state_file.write_text("not valid json {{{")
|
|
|
|
manager = StateManager(str(state_file))
|
|
state = manager.load()
|
|
|
|
# Should return empty state
|
|
assert len(state.sent_alerts) == 0
|
|
|
|
def test_dedup_key_format(self) -> None:
|
|
"""Test that dedup key has expected format."""
|
|
alert = make_triggered_alert(
|
|
alert_type=AlertType.TEMPERATURE_LOW, forecast_hour="2024-01-15-12"
|
|
)
|
|
|
|
assert alert.dedup_key == "temperature_low:2024-01-15-12"
|