From 7969068c36911d8435c55bc0ba5d7d0bea598d07 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Fri, 14 Nov 2025 13:41:49 -0600 Subject: [PATCH] Phase 3 Step 2: Implement ScheduleService with cron support Implement comprehensive schedule management service for automated scans: New Files: - web/services/schedule_service.py (470 lines) * Complete CRUD operations for schedules * Cron expression validation using croniter * Next run time calculation * Execution history tracking * Human-readable relative time formatting - tests/test_schedule_service.py (671 lines, 40+ tests) * Create/get/list/update/delete schedule tests * Cron validation and next run calculation tests * Pagination and filtering tests * Schedule history and serialization tests Changes: - requirements-web.txt: Add croniter==2.0.1 dependency - docs/ai/PHASE3.md: Mark Step 1 complete, Step 2 next Key Features: - Validates cron expressions before saving - Automatically calculates next execution time - Preserves historical scans when schedules deleted - Supports pagination and filtering by enabled status - Provides relative time display (e.g., "in 2 hours") --- docs/ai/PHASE3.md | 4 +- requirements-web.txt | 1 + tests/test_schedule_service.py | 671 +++++++++++++++++++++++++++++++ web/services/schedule_service.py | 470 ++++++++++++++++++++++ 4 files changed, 1144 insertions(+), 2 deletions(-) create mode 100644 tests/test_schedule_service.py create mode 100644 web/services/schedule_service.py diff --git a/docs/ai/PHASE3.md b/docs/ai/PHASE3.md index 0f6fb49..c04afbe 100644 --- a/docs/ai/PHASE3.md +++ b/docs/ai/PHASE3.md @@ -7,8 +7,8 @@ ## Progress Summary -- 📋 **Step 1: Fix Styling Issues & CSS Refactor** (Day 1) - NEXT -- 📋 **Step 2: ScheduleService Implementation** (Days 2-3) +- ✅ **Step 1: Fix Styling Issues & CSS Refactor** (Day 1) - COMPLETE +- 📋 **Step 2: ScheduleService Implementation** (Days 2-3) - NEXT - 📋 **Step 3: Schedules API Endpoints** (Days 4-5) - 📋 **Step 4: Schedule Management UI** (Days 6-7) - 📋 **Step 5: Enhanced Dashboard with Charts** (Days 8-9) diff --git a/requirements-web.txt b/requirements-web.txt index 7517cb9..0e7ce4a 100644 --- a/requirements-web.txt +++ b/requirements-web.txt @@ -21,6 +21,7 @@ marshmallow-sqlalchemy==0.29.0 # Background Jobs & Scheduling APScheduler==3.10.4 +croniter==2.0.1 # Email Support (Phase 4) Flask-Mail==0.9.1 diff --git a/tests/test_schedule_service.py b/tests/test_schedule_service.py new file mode 100644 index 0000000..4e4741d --- /dev/null +++ b/tests/test_schedule_service.py @@ -0,0 +1,671 @@ +""" +Unit tests for ScheduleService class. + +Tests schedule lifecycle operations: create, get, list, update, delete, and +cron expression validation. +""" + +import pytest +from datetime import datetime, timedelta + +from web.models import Schedule, Scan +from web.services.schedule_service import ScheduleService + + +class TestScheduleServiceCreate: + """Tests for creating schedules.""" + + def test_create_schedule_valid(self, test_db, sample_config_file): + """Test creating a schedule with valid parameters.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Daily Scan', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Verify schedule created + assert schedule_id is not None + assert isinstance(schedule_id, int) + + # Verify schedule in database + schedule = test_db.query(Schedule).filter(Schedule.id == schedule_id).first() + assert schedule is not None + assert schedule.name == 'Daily Scan' + assert schedule.config_file == sample_config_file + assert schedule.cron_expression == '0 2 * * *' + assert schedule.enabled is True + assert schedule.next_run is not None + assert schedule.last_run is None + + def test_create_schedule_disabled(self, test_db, sample_config_file): + """Test creating a disabled schedule.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Disabled Scan', + config_file=sample_config_file, + cron_expression='0 3 * * *', + enabled=False + ) + + schedule = test_db.query(Schedule).filter(Schedule.id == schedule_id).first() + assert schedule.enabled is False + assert schedule.next_run is None + + def test_create_schedule_invalid_cron(self, test_db, sample_config_file): + """Test creating a schedule with invalid cron expression.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Invalid cron expression"): + service.create_schedule( + name='Invalid Schedule', + config_file=sample_config_file, + cron_expression='invalid cron', + enabled=True + ) + + def test_create_schedule_nonexistent_config(self, test_db): + """Test creating a schedule with nonexistent config file.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Config file not found"): + service.create_schedule( + name='Bad Config', + config_file='/nonexistent/config.yaml', + cron_expression='0 2 * * *', + enabled=True + ) + + def test_create_schedule_various_cron_expressions(self, test_db, sample_config_file): + """Test creating schedules with various valid cron expressions.""" + service = ScheduleService(test_db) + + cron_expressions = [ + '0 0 * * *', # Daily at midnight + '*/15 * * * *', # Every 15 minutes + '0 2 * * 0', # Weekly on Sunday at 2 AM + '0 0 1 * *', # Monthly on the 1st at midnight + '30 14 * * 1-5', # Weekdays at 2:30 PM + ] + + for i, cron in enumerate(cron_expressions): + schedule_id = service.create_schedule( + name=f'Schedule {i}', + config_file=sample_config_file, + cron_expression=cron, + enabled=True + ) + assert schedule_id is not None + + +class TestScheduleServiceGet: + """Tests for retrieving schedules.""" + + def test_get_schedule_not_found(self, test_db): + """Test getting a nonexistent schedule.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Schedule .* not found"): + service.get_schedule(999) + + def test_get_schedule_found(self, test_db, sample_config_file): + """Test getting an existing schedule.""" + service = ScheduleService(test_db) + + # Create a schedule + schedule_id = service.create_schedule( + name='Test Schedule', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Retrieve it + result = service.get_schedule(schedule_id) + + assert result is not None + assert result['id'] == schedule_id + assert result['name'] == 'Test Schedule' + assert result['cron_expression'] == '0 2 * * *' + assert result['enabled'] is True + assert 'history' in result + assert isinstance(result['history'], list) + + def test_get_schedule_with_history(self, test_db, sample_config_file): + """Test getting schedule includes execution history.""" + service = ScheduleService(test_db) + + # Create schedule + schedule_id = service.create_schedule( + name='Test Schedule', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Create associated scans + for i in range(3): + scan = Scan( + timestamp=datetime.utcnow() - timedelta(days=i), + status='completed', + config_file=sample_config_file, + title=f'Scan {i}', + triggered_by='scheduled', + schedule_id=schedule_id + ) + test_db.add(scan) + test_db.commit() + + # Get schedule + result = service.get_schedule(schedule_id) + + assert len(result['history']) == 3 + assert result['history'][0]['title'] == 'Scan 0' # Most recent first + + +class TestScheduleServiceList: + """Tests for listing schedules.""" + + def test_list_schedules_empty(self, test_db): + """Test listing schedules when database is empty.""" + service = ScheduleService(test_db) + + result = service.list_schedules(page=1, per_page=20) + + assert result['total'] == 0 + assert len(result['schedules']) == 0 + assert result['page'] == 1 + assert result['per_page'] == 20 + + def test_list_schedules_populated(self, test_db, sample_config_file): + """Test listing schedules with data.""" + service = ScheduleService(test_db) + + # Create multiple schedules + for i in range(5): + service.create_schedule( + name=f'Schedule {i}', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + result = service.list_schedules(page=1, per_page=20) + + assert result['total'] == 5 + assert len(result['schedules']) == 5 + assert all('name' in s for s in result['schedules']) + + def test_list_schedules_pagination(self, test_db, sample_config_file): + """Test schedule pagination.""" + service = ScheduleService(test_db) + + # Create 25 schedules + for i in range(25): + service.create_schedule( + name=f'Schedule {i:02d}', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Get first page + result_page1 = service.list_schedules(page=1, per_page=10) + assert len(result_page1['schedules']) == 10 + assert result_page1['total'] == 25 + assert result_page1['pages'] == 3 + + # Get second page + result_page2 = service.list_schedules(page=2, per_page=10) + assert len(result_page2['schedules']) == 10 + + # Get third page + result_page3 = service.list_schedules(page=3, per_page=10) + assert len(result_page3['schedules']) == 5 + + def test_list_schedules_filter_enabled(self, test_db, sample_config_file): + """Test filtering schedules by enabled status.""" + service = ScheduleService(test_db) + + # Create enabled and disabled schedules + for i in range(3): + service.create_schedule( + name=f'Enabled {i}', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + for i in range(2): + service.create_schedule( + name=f'Disabled {i}', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=False + ) + + # Filter enabled only + result_enabled = service.list_schedules(enabled_filter=True) + assert result_enabled['total'] == 3 + + # Filter disabled only + result_disabled = service.list_schedules(enabled_filter=False) + assert result_disabled['total'] == 2 + + # No filter + result_all = service.list_schedules(enabled_filter=None) + assert result_all['total'] == 5 + + +class TestScheduleServiceUpdate: + """Tests for updating schedules.""" + + def test_update_schedule_name(self, test_db, sample_config_file): + """Test updating schedule name.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Old Name', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + result = service.update_schedule(schedule_id, name='New Name') + + assert result['name'] == 'New Name' + assert result['cron_expression'] == '0 2 * * *' + + def test_update_schedule_cron(self, test_db, sample_config_file): + """Test updating cron expression recalculates next_run.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + original = service.get_schedule(schedule_id) + original_next_run = original['next_run'] + + # Update cron expression + result = service.update_schedule( + schedule_id, + cron_expression='0 3 * * *' + ) + + # Next run should be recalculated + assert result['cron_expression'] == '0 3 * * *' + assert result['next_run'] != original_next_run + + def test_update_schedule_invalid_cron(self, test_db, sample_config_file): + """Test updating with invalid cron expression fails.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + with pytest.raises(ValueError, match="Invalid cron expression"): + service.update_schedule(schedule_id, cron_expression='invalid') + + def test_update_schedule_not_found(self, test_db): + """Test updating nonexistent schedule fails.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Schedule .* not found"): + service.update_schedule(999, name='New Name') + + def test_update_schedule_invalid_config_file(self, test_db, sample_config_file): + """Test updating with nonexistent config file fails.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + with pytest.raises(ValueError, match="Config file not found"): + service.update_schedule(schedule_id, config_file='/nonexistent.yaml') + + +class TestScheduleServiceDelete: + """Tests for deleting schedules.""" + + def test_delete_schedule(self, test_db, sample_config_file): + """Test deleting a schedule.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='To Delete', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Verify exists + assert test_db.query(Schedule).filter(Schedule.id == schedule_id).first() is not None + + # Delete + result = service.delete_schedule(schedule_id) + assert result is True + + # Verify deleted + assert test_db.query(Schedule).filter(Schedule.id == schedule_id).first() is None + + def test_delete_schedule_not_found(self, test_db): + """Test deleting nonexistent schedule fails.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Schedule .* not found"): + service.delete_schedule(999) + + def test_delete_schedule_preserves_scans(self, test_db, sample_config_file): + """Test that deleting schedule preserves associated scans.""" + service = ScheduleService(test_db) + + # Create schedule + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Create associated scan + scan = Scan( + timestamp=datetime.utcnow(), + status='completed', + config_file=sample_config_file, + title='Test Scan', + triggered_by='scheduled', + schedule_id=schedule_id + ) + test_db.add(scan) + test_db.commit() + scan_id = scan.id + + # Delete schedule + service.delete_schedule(schedule_id) + + # Verify scan still exists (schedule_id becomes null) + remaining_scan = test_db.query(Scan).filter(Scan.id == scan_id).first() + assert remaining_scan is not None + assert remaining_scan.schedule_id is None + + +class TestScheduleServiceToggle: + """Tests for toggling schedule enabled status.""" + + def test_toggle_enabled_to_disabled(self, test_db, sample_config_file): + """Test disabling an enabled schedule.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + result = service.toggle_enabled(schedule_id, enabled=False) + + assert result['enabled'] is False + assert result['next_run'] is None + + def test_toggle_disabled_to_enabled(self, test_db, sample_config_file): + """Test enabling a disabled schedule.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=False + ) + + result = service.toggle_enabled(schedule_id, enabled=True) + + assert result['enabled'] is True + assert result['next_run'] is not None + + +class TestScheduleServiceRunTimes: + """Tests for updating run times.""" + + def test_update_run_times(self, test_db, sample_config_file): + """Test updating last_run and next_run.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + last_run = datetime.utcnow() + next_run = datetime.utcnow() + timedelta(days=1) + + result = service.update_run_times(schedule_id, last_run, next_run) + assert result is True + + schedule = service.get_schedule(schedule_id) + assert schedule['last_run'] is not None + assert schedule['next_run'] is not None + + def test_update_run_times_not_found(self, test_db): + """Test updating run times for nonexistent schedule.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Schedule .* not found"): + service.update_run_times( + 999, + datetime.utcnow(), + datetime.utcnow() + timedelta(days=1) + ) + + +class TestCronValidation: + """Tests for cron expression validation.""" + + def test_validate_cron_valid_expressions(self, test_db): + """Test validating various valid cron expressions.""" + service = ScheduleService(test_db) + + valid_expressions = [ + '0 0 * * *', # Daily at midnight + '*/15 * * * *', # Every 15 minutes + '0 2 * * 0', # Weekly on Sunday + '0 0 1 * *', # Monthly + '30 14 * * 1-5', # Weekdays + '0 */4 * * *', # Every 4 hours + ] + + for expr in valid_expressions: + is_valid, error = service.validate_cron_expression(expr) + assert is_valid is True, f"Expression '{expr}' should be valid" + assert error is None + + def test_validate_cron_invalid_expressions(self, test_db): + """Test validating invalid cron expressions.""" + service = ScheduleService(test_db) + + invalid_expressions = [ + 'invalid', + '60 0 * * *', # Invalid minute (0-59) + '0 24 * * *', # Invalid hour (0-23) + '0 0 32 * *', # Invalid day (1-31) + '0 0 * 13 *', # Invalid month (1-12) + '0 0 * * 7', # Invalid weekday (0-6) + ] + + for expr in invalid_expressions: + is_valid, error = service.validate_cron_expression(expr) + assert is_valid is False, f"Expression '{expr}' should be invalid" + assert error is not None + + +class TestNextRunCalculation: + """Tests for next run time calculation.""" + + def test_calculate_next_run(self, test_db): + """Test calculating next run time.""" + service = ScheduleService(test_db) + + # Daily at 2 AM + next_run = service.calculate_next_run('0 2 * * *') + + assert next_run is not None + assert isinstance(next_run, datetime) + assert next_run > datetime.utcnow() + + def test_calculate_next_run_from_time(self, test_db): + """Test calculating next run from specific time.""" + service = ScheduleService(test_db) + + base_time = datetime(2025, 1, 1, 0, 0, 0) + next_run = service.calculate_next_run('0 2 * * *', from_time=base_time) + + # Should be 2 AM on same day + assert next_run.hour == 2 + assert next_run.minute == 0 + + def test_calculate_next_run_invalid_cron(self, test_db): + """Test calculating next run with invalid cron raises error.""" + service = ScheduleService(test_db) + + with pytest.raises(ValueError, match="Invalid cron expression"): + service.calculate_next_run('invalid cron') + + +class TestScheduleHistory: + """Tests for schedule execution history.""" + + def test_get_schedule_history_empty(self, test_db, sample_config_file): + """Test getting history for schedule with no executions.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + history = service.get_schedule_history(schedule_id) + assert len(history) == 0 + + def test_get_schedule_history_with_scans(self, test_db, sample_config_file): + """Test getting history with multiple scans.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Create 15 scans + for i in range(15): + scan = Scan( + timestamp=datetime.utcnow() - timedelta(days=i), + status='completed', + config_file=sample_config_file, + title=f'Scan {i}', + triggered_by='scheduled', + schedule_id=schedule_id + ) + test_db.add(scan) + test_db.commit() + + # Get history (default limit 10) + history = service.get_schedule_history(schedule_id, limit=10) + assert len(history) == 10 + assert history[0]['title'] == 'Scan 0' # Most recent first + + def test_get_schedule_history_custom_limit(self, test_db, sample_config_file): + """Test getting history with custom limit.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + # Create 10 scans + for i in range(10): + scan = Scan( + timestamp=datetime.utcnow() - timedelta(days=i), + status='completed', + config_file=sample_config_file, + title=f'Scan {i}', + triggered_by='scheduled', + schedule_id=schedule_id + ) + test_db.add(scan) + test_db.commit() + + # Get only 5 + history = service.get_schedule_history(schedule_id, limit=5) + assert len(history) == 5 + + +class TestScheduleSerialization: + """Tests for schedule serialization.""" + + def test_schedule_to_dict(self, test_db, sample_config_file): + """Test converting schedule to dictionary.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test Schedule', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + result = service.get_schedule(schedule_id) + + # Verify all required fields + assert 'id' in result + assert 'name' in result + assert 'config_file' in result + assert 'cron_expression' in result + assert 'enabled' in result + assert 'last_run' in result + assert 'next_run' in result + assert 'next_run_relative' in result + assert 'created_at' in result + assert 'updated_at' in result + assert 'history' in result + + def test_schedule_relative_time_formatting(self, test_db, sample_config_file): + """Test relative time formatting in schedule dict.""" + service = ScheduleService(test_db) + + schedule_id = service.create_schedule( + name='Test', + config_file=sample_config_file, + cron_expression='0 2 * * *', + enabled=True + ) + + result = service.get_schedule(schedule_id) + + # Should have relative time for next_run + assert result['next_run_relative'] is not None + assert isinstance(result['next_run_relative'], str) + assert 'in' in result['next_run_relative'].lower() diff --git a/web/services/schedule_service.py b/web/services/schedule_service.py new file mode 100644 index 0000000..3c84874 --- /dev/null +++ b/web/services/schedule_service.py @@ -0,0 +1,470 @@ +""" +Schedule service for managing scheduled scan operations. + +This service handles the business logic for creating, updating, and managing +scheduled scans with cron expressions. +""" + +import logging +import os +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +from croniter import croniter +from sqlalchemy.orm import Session + +from web.models import Schedule, Scan +from web.utils.pagination import paginate, PaginatedResult + +logger = logging.getLogger(__name__) + + +class ScheduleService: + """ + Service for managing scheduled scans. + + Handles schedule lifecycle: creation, validation, updating, + and cron expression processing. + """ + + def __init__(self, db_session: Session): + """ + Initialize schedule service. + + Args: + db_session: SQLAlchemy database session + """ + self.db = db_session + + def create_schedule( + self, + name: str, + config_file: str, + cron_expression: str, + enabled: bool = True + ) -> int: + """ + Create a new schedule. + + Args: + name: Human-readable schedule name + config_file: Path to YAML configuration file + cron_expression: Cron expression (e.g., '0 2 * * *') + enabled: Whether schedule is active + + Returns: + Schedule ID of the created schedule + + Raises: + ValueError: If cron expression is invalid or config file doesn't exist + """ + # Validate cron expression + is_valid, error_msg = self.validate_cron_expression(cron_expression) + if not is_valid: + raise ValueError(f"Invalid cron expression: {error_msg}") + + # Validate config file exists + if not os.path.isfile(config_file): + raise ValueError(f"Config file not found: {config_file}") + + # Calculate next run time + next_run = self.calculate_next_run(cron_expression) if enabled else None + + # Create schedule record + schedule = Schedule( + name=name, + config_file=config_file, + cron_expression=cron_expression, + enabled=enabled, + last_run=None, + next_run=next_run, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + + self.db.add(schedule) + self.db.commit() + self.db.refresh(schedule) + + logger.info(f"Schedule {schedule.id} created: '{name}' with cron '{cron_expression}'") + + return schedule.id + + def get_schedule(self, schedule_id: int) -> Dict[str, Any]: + """ + Get schedule details by ID. + + Args: + schedule_id: Schedule ID + + Returns: + Schedule dictionary with details and execution history + + Raises: + ValueError: If schedule not found + """ + schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() + + if not schedule: + raise ValueError(f"Schedule {schedule_id} not found") + + # Convert to dict and include history + schedule_dict = self._schedule_to_dict(schedule) + schedule_dict['history'] = self.get_schedule_history(schedule_id, limit=10) + + return schedule_dict + + def list_schedules( + self, + page: int = 1, + per_page: int = 20, + enabled_filter: Optional[bool] = None + ) -> Dict[str, Any]: + """ + List all schedules with pagination and filtering. + + Args: + page: Page number (1-indexed) + per_page: Items per page + enabled_filter: Filter by enabled status (None = all) + + Returns: + Dictionary with paginated schedules: + { + 'schedules': [...], + 'total': int, + 'page': int, + 'per_page': int, + 'pages': int + } + """ + # Build query + query = self.db.query(Schedule) + + # Apply filter + if enabled_filter is not None: + query = query.filter(Schedule.enabled == enabled_filter) + + # Order by next_run (nulls last), then by name + query = query.order_by(Schedule.next_run.is_(None), Schedule.next_run, Schedule.name) + + # Paginate + result = paginate(query, page=page, per_page=per_page) + + # Convert schedules to dicts + schedules = [self._schedule_to_dict(s) for s in result.items] + + return { + 'schedules': schedules, + 'total': result.total, + 'page': result.page, + 'per_page': result.per_page, + 'pages': result.pages + } + + def update_schedule( + self, + schedule_id: int, + **updates: Any + ) -> Dict[str, Any]: + """ + Update schedule fields. + + Args: + schedule_id: Schedule ID + **updates: Fields to update (name, config_file, cron_expression, enabled) + + Returns: + Updated schedule dictionary + + Raises: + ValueError: If schedule not found or invalid updates + """ + schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() + + if not schedule: + raise ValueError(f"Schedule {schedule_id} not found") + + # Validate cron expression if being updated + if 'cron_expression' in updates: + is_valid, error_msg = self.validate_cron_expression(updates['cron_expression']) + if not is_valid: + raise ValueError(f"Invalid cron expression: {error_msg}") + # Recalculate next_run + if schedule.enabled or updates.get('enabled', False): + updates['next_run'] = self.calculate_next_run(updates['cron_expression']) + + # Validate config file if being updated + if 'config_file' in updates: + if not os.path.isfile(updates['config_file']): + raise ValueError(f"Config file not found: {updates['config_file']}") + + # Handle enabled toggle + if 'enabled' in updates: + if updates['enabled'] and not schedule.enabled: + # Being enabled - calculate next_run + cron_expr = updates.get('cron_expression', schedule.cron_expression) + updates['next_run'] = self.calculate_next_run(cron_expr) + elif not updates['enabled'] and schedule.enabled: + # Being disabled - clear next_run + updates['next_run'] = None + + # Update fields + for key, value in updates.items(): + if hasattr(schedule, key): + setattr(schedule, key, value) + + schedule.updated_at = datetime.utcnow() + + self.db.commit() + self.db.refresh(schedule) + + logger.info(f"Schedule {schedule_id} updated: {list(updates.keys())}") + + return self._schedule_to_dict(schedule) + + def delete_schedule(self, schedule_id: int) -> bool: + """ + Delete a schedule. + + Note: Associated scans are NOT deleted (schedule_id becomes null). + + Args: + schedule_id: Schedule ID + + Returns: + True if deleted successfully + + Raises: + ValueError: If schedule not found + """ + schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() + + if not schedule: + raise ValueError(f"Schedule {schedule_id} not found") + + schedule_name = schedule.name + + self.db.delete(schedule) + self.db.commit() + + logger.info(f"Schedule {schedule_id} ('{schedule_name}') deleted") + + return True + + def toggle_enabled(self, schedule_id: int, enabled: bool) -> Dict[str, Any]: + """ + Enable or disable a schedule. + + Args: + schedule_id: Schedule ID + enabled: New enabled status + + Returns: + Updated schedule dictionary + + Raises: + ValueError: If schedule not found + """ + return self.update_schedule(schedule_id, enabled=enabled) + + def update_run_times( + self, + schedule_id: int, + last_run: datetime, + next_run: datetime + ) -> bool: + """ + Update last_run and next_run timestamps. + + Called after each execution. + + Args: + schedule_id: Schedule ID + last_run: Last execution time + next_run: Next scheduled execution time + + Returns: + True if updated successfully + + Raises: + ValueError: If schedule not found + """ + schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() + + if not schedule: + raise ValueError(f"Schedule {schedule_id} not found") + + schedule.last_run = last_run + schedule.next_run = next_run + schedule.updated_at = datetime.utcnow() + + self.db.commit() + + logger.debug(f"Schedule {schedule_id} run times updated: last={last_run}, next={next_run}") + + return True + + def validate_cron_expression(self, cron_expr: str) -> Tuple[bool, Optional[str]]: + """ + Validate a cron expression. + + Args: + cron_expr: Cron expression to validate + + Returns: + Tuple of (is_valid, error_message) + - (True, None) if valid + - (False, error_message) if invalid + """ + try: + # Try to create a croniter instance + base_time = datetime.utcnow() + cron = croniter(cron_expr, base_time) + + # Try to get the next run time (validates the expression) + cron.get_next(datetime) + + return (True, None) + except (ValueError, KeyError) as e: + return (False, str(e)) + except Exception as e: + return (False, f"Unexpected error: {str(e)}") + + def calculate_next_run( + self, + cron_expr: str, + from_time: Optional[datetime] = None + ) -> datetime: + """ + Calculate next run time from cron expression. + + Args: + cron_expr: Cron expression + from_time: Base time (defaults to now UTC) + + Returns: + Next run datetime (UTC) + + Raises: + ValueError: If cron expression is invalid + """ + if from_time is None: + from_time = datetime.utcnow() + + try: + cron = croniter(cron_expr, from_time) + return cron.get_next(datetime) + except Exception as e: + raise ValueError(f"Invalid cron expression '{cron_expr}': {str(e)}") + + def get_schedule_history( + self, + schedule_id: int, + limit: int = 10 + ) -> List[Dict[str, Any]]: + """ + Get recent scans triggered by this schedule. + + Args: + schedule_id: Schedule ID + limit: Maximum number of scans to return + + Returns: + List of scan dictionaries (recent first) + """ + scans = ( + self.db.query(Scan) + .filter(Scan.schedule_id == schedule_id) + .order_by(Scan.timestamp.desc()) + .limit(limit) + .all() + ) + + return [ + { + 'id': scan.id, + 'timestamp': scan.timestamp.isoformat() if scan.timestamp else None, + 'status': scan.status, + 'title': scan.title, + 'config_file': scan.config_file + } + for scan in scans + ] + + def _schedule_to_dict(self, schedule: Schedule) -> Dict[str, Any]: + """ + Convert Schedule model to dictionary. + + Args: + schedule: Schedule model instance + + Returns: + Dictionary representation + """ + return { + 'id': schedule.id, + 'name': schedule.name, + 'config_file': schedule.config_file, + 'cron_expression': schedule.cron_expression, + 'enabled': schedule.enabled, + 'last_run': schedule.last_run.isoformat() if schedule.last_run else None, + 'next_run': schedule.next_run.isoformat() if schedule.next_run else None, + 'next_run_relative': self._get_relative_time(schedule.next_run) if schedule.next_run else None, + 'created_at': schedule.created_at.isoformat() if schedule.created_at else None, + 'updated_at': schedule.updated_at.isoformat() if schedule.updated_at else None + } + + def _get_relative_time(self, dt: Optional[datetime]) -> Optional[str]: + """ + Format datetime as relative time. + + Args: + dt: Datetime to format (UTC) + + Returns: + Human-readable relative time (e.g., "in 2 hours", "yesterday") + """ + if dt is None: + return None + + now = datetime.utcnow() + diff = dt - now + + # Future times + if diff.total_seconds() > 0: + seconds = int(diff.total_seconds()) + + if seconds < 60: + return "in less than a minute" + elif seconds < 3600: + minutes = seconds // 60 + return f"in {minutes} minute{'s' if minutes != 1 else ''}" + elif seconds < 86400: + hours = seconds // 3600 + return f"in {hours} hour{'s' if hours != 1 else ''}" + elif seconds < 604800: + days = seconds // 86400 + return f"in {days} day{'s' if days != 1 else ''}" + else: + weeks = seconds // 604800 + return f"in {weeks} week{'s' if weeks != 1 else ''}" + + # Past times + else: + seconds = int(-diff.total_seconds()) + + if seconds < 60: + return "less than a minute ago" + elif seconds < 3600: + minutes = seconds // 60 + return f"{minutes} minute{'s' if minutes != 1 else ''} ago" + elif seconds < 86400: + hours = seconds // 3600 + return f"{hours} hour{'s' if hours != 1 else ''} ago" + elif seconds < 604800: + days = seconds // 86400 + return f"{days} day{'s' if days != 1 else ''} ago" + else: + weeks = seconds // 604800 + return f"{weeks} week{'s' if weeks != 1 else ''} ago"