""" Schedule service for managing scheduled scan operations. This service handles the business logic for creating, updating, and managing scheduled scans with cron expressions. """ import logging import os from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from croniter import croniter from sqlalchemy.orm import Session from web.models import Schedule, Scan from web.utils.pagination import paginate, PaginatedResult logger = logging.getLogger(__name__) class ScheduleService: """ Service for managing scheduled scans. Handles schedule lifecycle: creation, validation, updating, and cron expression processing. """ def __init__(self, db_session: Session): """ Initialize schedule service. Args: db_session: SQLAlchemy database session """ self.db = db_session def create_schedule( self, name: str, config_file: str, cron_expression: str, enabled: bool = True ) -> int: """ Create a new schedule. Args: name: Human-readable schedule name config_file: Path to YAML configuration file cron_expression: Cron expression (e.g., '0 2 * * *') enabled: Whether schedule is active Returns: Schedule ID of the created schedule Raises: ValueError: If cron expression is invalid or config file doesn't exist """ # Validate cron expression is_valid, error_msg = self.validate_cron_expression(cron_expression) if not is_valid: raise ValueError(f"Invalid cron expression: {error_msg}") # Validate config file exists if not os.path.isfile(config_file): raise ValueError(f"Config file not found: {config_file}") # Calculate next run time next_run = self.calculate_next_run(cron_expression) if enabled else None # Create schedule record schedule = Schedule( name=name, config_file=config_file, cron_expression=cron_expression, enabled=enabled, last_run=None, next_run=next_run, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) self.db.add(schedule) self.db.commit() self.db.refresh(schedule) logger.info(f"Schedule {schedule.id} created: '{name}' with cron '{cron_expression}'") return schedule.id def get_schedule(self, schedule_id: int) -> Dict[str, Any]: """ Get schedule details by ID. Args: schedule_id: Schedule ID Returns: Schedule dictionary with details and execution history Raises: ValueError: If schedule not found """ schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() if not schedule: raise ValueError(f"Schedule {schedule_id} not found") # Convert to dict and include history schedule_dict = self._schedule_to_dict(schedule) schedule_dict['history'] = self.get_schedule_history(schedule_id, limit=10) return schedule_dict def list_schedules( self, page: int = 1, per_page: int = 20, enabled_filter: Optional[bool] = None ) -> Dict[str, Any]: """ List all schedules with pagination and filtering. Args: page: Page number (1-indexed) per_page: Items per page enabled_filter: Filter by enabled status (None = all) Returns: Dictionary with paginated schedules: { 'schedules': [...], 'total': int, 'page': int, 'per_page': int, 'pages': int } """ # Build query query = self.db.query(Schedule) # Apply filter if enabled_filter is not None: query = query.filter(Schedule.enabled == enabled_filter) # Order by next_run (nulls last), then by name query = query.order_by(Schedule.next_run.is_(None), Schedule.next_run, Schedule.name) # Paginate result = paginate(query, page=page, per_page=per_page) # Convert schedules to dicts schedules = [self._schedule_to_dict(s) for s in result.items] return { 'schedules': schedules, 'total': result.total, 'page': result.page, 'per_page': result.per_page, 'pages': result.pages } def update_schedule( self, schedule_id: int, **updates: Any ) -> Dict[str, Any]: """ Update schedule fields. Args: schedule_id: Schedule ID **updates: Fields to update (name, config_file, cron_expression, enabled) Returns: Updated schedule dictionary Raises: ValueError: If schedule not found or invalid updates """ schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() if not schedule: raise ValueError(f"Schedule {schedule_id} not found") # Validate cron expression if being updated if 'cron_expression' in updates: is_valid, error_msg = self.validate_cron_expression(updates['cron_expression']) if not is_valid: raise ValueError(f"Invalid cron expression: {error_msg}") # Recalculate next_run if schedule.enabled or updates.get('enabled', False): updates['next_run'] = self.calculate_next_run(updates['cron_expression']) # Validate config file if being updated if 'config_file' in updates: if not os.path.isfile(updates['config_file']): raise ValueError(f"Config file not found: {updates['config_file']}") # Handle enabled toggle if 'enabled' in updates: if updates['enabled'] and not schedule.enabled: # Being enabled - calculate next_run cron_expr = updates.get('cron_expression', schedule.cron_expression) updates['next_run'] = self.calculate_next_run(cron_expr) elif not updates['enabled'] and schedule.enabled: # Being disabled - clear next_run updates['next_run'] = None # Update fields for key, value in updates.items(): if hasattr(schedule, key): setattr(schedule, key, value) schedule.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(schedule) logger.info(f"Schedule {schedule_id} updated: {list(updates.keys())}") return self._schedule_to_dict(schedule) def delete_schedule(self, schedule_id: int) -> bool: """ Delete a schedule. Note: Associated scans are NOT deleted (schedule_id becomes null). Args: schedule_id: Schedule ID Returns: True if deleted successfully Raises: ValueError: If schedule not found """ schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() if not schedule: raise ValueError(f"Schedule {schedule_id} not found") schedule_name = schedule.name self.db.delete(schedule) self.db.commit() logger.info(f"Schedule {schedule_id} ('{schedule_name}') deleted") return True def toggle_enabled(self, schedule_id: int, enabled: bool) -> Dict[str, Any]: """ Enable or disable a schedule. Args: schedule_id: Schedule ID enabled: New enabled status Returns: Updated schedule dictionary Raises: ValueError: If schedule not found """ return self.update_schedule(schedule_id, enabled=enabled) def update_run_times( self, schedule_id: int, last_run: datetime, next_run: datetime ) -> bool: """ Update last_run and next_run timestamps. Called after each execution. Args: schedule_id: Schedule ID last_run: Last execution time next_run: Next scheduled execution time Returns: True if updated successfully Raises: ValueError: If schedule not found """ schedule = self.db.query(Schedule).filter(Schedule.id == schedule_id).first() if not schedule: raise ValueError(f"Schedule {schedule_id} not found") schedule.last_run = last_run schedule.next_run = next_run schedule.updated_at = datetime.utcnow() self.db.commit() logger.debug(f"Schedule {schedule_id} run times updated: last={last_run}, next={next_run}") return True def validate_cron_expression(self, cron_expr: str) -> Tuple[bool, Optional[str]]: """ Validate a cron expression. Args: cron_expr: Cron expression to validate Returns: Tuple of (is_valid, error_message) - (True, None) if valid - (False, error_message) if invalid """ try: # Try to create a croniter instance base_time = datetime.utcnow() cron = croniter(cron_expr, base_time) # Try to get the next run time (validates the expression) cron.get_next(datetime) return (True, None) except (ValueError, KeyError) as e: return (False, str(e)) except Exception as e: return (False, f"Unexpected error: {str(e)}") def calculate_next_run( self, cron_expr: str, from_time: Optional[datetime] = None ) -> datetime: """ Calculate next run time from cron expression. Args: cron_expr: Cron expression from_time: Base time (defaults to now UTC) Returns: Next run datetime (UTC) Raises: ValueError: If cron expression is invalid """ if from_time is None: from_time = datetime.utcnow() try: cron = croniter(cron_expr, from_time) return cron.get_next(datetime) except Exception as e: raise ValueError(f"Invalid cron expression '{cron_expr}': {str(e)}") def get_schedule_history( self, schedule_id: int, limit: int = 10 ) -> List[Dict[str, Any]]: """ Get recent scans triggered by this schedule. Args: schedule_id: Schedule ID limit: Maximum number of scans to return Returns: List of scan dictionaries (recent first) """ scans = ( self.db.query(Scan) .filter(Scan.schedule_id == schedule_id) .order_by(Scan.timestamp.desc()) .limit(limit) .all() ) return [ { 'id': scan.id, 'timestamp': scan.timestamp.isoformat() if scan.timestamp else None, 'status': scan.status, 'title': scan.title, 'config_file': scan.config_file } for scan in scans ] def _schedule_to_dict(self, schedule: Schedule) -> Dict[str, Any]: """ Convert Schedule model to dictionary. Args: schedule: Schedule model instance Returns: Dictionary representation """ return { 'id': schedule.id, 'name': schedule.name, 'config_file': schedule.config_file, 'cron_expression': schedule.cron_expression, 'enabled': schedule.enabled, 'last_run': schedule.last_run.isoformat() if schedule.last_run else None, 'next_run': schedule.next_run.isoformat() if schedule.next_run else None, 'next_run_relative': self._get_relative_time(schedule.next_run) if schedule.next_run else None, 'created_at': schedule.created_at.isoformat() if schedule.created_at else None, 'updated_at': schedule.updated_at.isoformat() if schedule.updated_at else None } def _get_relative_time(self, dt: Optional[datetime]) -> Optional[str]: """ Format datetime as relative time. Args: dt: Datetime to format (UTC) Returns: Human-readable relative time (e.g., "in 2 hours", "yesterday") """ if dt is None: return None now = datetime.utcnow() diff = dt - now # Future times if diff.total_seconds() > 0: seconds = int(diff.total_seconds()) if seconds < 60: return "in less than a minute" elif seconds < 3600: minutes = seconds // 60 return f"in {minutes} minute{'s' if minutes != 1 else ''}" elif seconds < 86400: hours = seconds // 3600 return f"in {hours} hour{'s' if hours != 1 else ''}" elif seconds < 604800: days = seconds // 86400 return f"in {days} day{'s' if days != 1 else ''}" else: weeks = seconds // 604800 return f"in {weeks} week{'s' if weeks != 1 else ''}" # Past times else: seconds = int(-diff.total_seconds()) if seconds < 60: return "less than a minute ago" elif seconds < 3600: minutes = seconds // 60 return f"{minutes} minute{'s' if minutes != 1 else ''} ago" elif seconds < 86400: hours = seconds // 3600 return f"{hours} hour{'s' if hours != 1 else ''} ago" elif seconds < 604800: days = seconds // 86400 return f"{days} day{'s' if days != 1 else ''} ago" else: weeks = seconds // 604800 return f"{weeks} week{'s' if weeks != 1 else ''} ago"