""" Scheduler service for managing background jobs and scheduled scans. This service integrates APScheduler with Flask to enable background scan execution and future scheduled scanning capabilities. """ import logging from datetime import datetime, timezone from typing import Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from flask import Flask from web.jobs.scan_job import execute_scan logger = logging.getLogger(__name__) class SchedulerService: """ Service for managing background job scheduling. Uses APScheduler's BackgroundScheduler to run scans asynchronously without blocking HTTP requests. """ def __init__(self): """Initialize scheduler service (scheduler not started yet).""" self.scheduler: Optional[BackgroundScheduler] = None self.db_url: Optional[str] = None def init_scheduler(self, app: Flask): """ Initialize and start APScheduler with Flask app. Args: app: Flask application instance Configuration: - BackgroundScheduler: Runs in separate thread - ThreadPoolExecutor: Allows concurrent scan execution - Max workers: 3 (configurable via SCHEDULER_MAX_WORKERS) """ if self.scheduler: logger.warning("Scheduler already initialized") return # Store database URL for passing to background jobs self.db_url = app.config['SQLALCHEMY_DATABASE_URI'] # Configure executor for concurrent jobs max_workers = app.config.get('SCHEDULER_MAX_WORKERS', 3) executors = { 'default': ThreadPoolExecutor(max_workers=max_workers) } # Configure job defaults job_defaults = { 'coalesce': True, # Combine multiple pending instances into one 'max_instances': app.config.get('SCHEDULER_MAX_INSTANCES', 3), 'misfire_grace_time': 60 # Allow 60 seconds for delayed starts } # Create scheduler with local system timezone # This allows users to schedule jobs using their local time # APScheduler will automatically use the system's local timezone self.scheduler = BackgroundScheduler( executors=executors, job_defaults=job_defaults # timezone defaults to local system timezone ) # Start scheduler self.scheduler.start() logger.info(f"APScheduler started with {max_workers} max workers") # Register shutdown handler import atexit atexit.register(lambda: self.shutdown()) def shutdown(self): """ Shutdown scheduler gracefully. Waits for running jobs to complete before shutting down. """ if self.scheduler: logger.info("Shutting down APScheduler...") self.scheduler.shutdown(wait=True) logger.info("APScheduler shutdown complete") self.scheduler = None def load_schedules_on_startup(self): """ Load all enabled schedules from database and register with APScheduler. Should be called after init_scheduler() to restore scheduled jobs that were active when the application last shutdown. Raises: RuntimeError: If scheduler not initialized """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") # Import here to avoid circular imports from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from web.models import Schedule try: # Create database session engine = create_engine(self.db_url) Session = sessionmaker(bind=engine) session = Session() try: # Query all enabled schedules enabled_schedules = ( session.query(Schedule) .filter(Schedule.enabled == True) .all() ) logger.info(f"Loading {len(enabled_schedules)} enabled schedules on startup") # Register each schedule with APScheduler for schedule in enabled_schedules: try: self.add_scheduled_scan( schedule_id=schedule.id, config_file=schedule.config_file, cron_expression=schedule.cron_expression ) logger.info(f"Loaded schedule {schedule.id}: '{schedule.name}'") except Exception as e: logger.error( f"Failed to load schedule {schedule.id} ('{schedule.name}'): {str(e)}", exc_info=True ) logger.info("Schedule loading complete") finally: session.close() except Exception as e: logger.error(f"Error loading schedules on startup: {str(e)}", exc_info=True) def queue_scan(self, scan_id: int, config_file: str) -> str: """ Queue a scan for immediate background execution. Args: scan_id: Database ID of the scan config_file: Path to YAML configuration file Returns: Job ID from APScheduler Raises: RuntimeError: If scheduler not initialized """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") # Add job to run immediately job = self.scheduler.add_job( func=execute_scan, args=[scan_id, config_file, self.db_url], id=f'scan_{scan_id}', name=f'Scan {scan_id}', replace_existing=True, misfire_grace_time=300 # 5 minutes ) logger.info(f"Queued scan {scan_id} for background execution (job_id={job.id})") return job.id def add_scheduled_scan(self, schedule_id: int, config_file: str, cron_expression: str) -> str: """ Add a recurring scheduled scan. Args: schedule_id: Database ID of the schedule config_file: Path to YAML configuration file cron_expression: Cron expression (e.g., "0 2 * * *" for 2am daily) Returns: Job ID from APScheduler Raises: RuntimeError: If scheduler not initialized ValueError: If cron expression is invalid """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") from apscheduler.triggers.cron import CronTrigger # Create cron trigger from expression using local timezone # This allows users to specify times in their local timezone try: trigger = CronTrigger.from_crontab(cron_expression) # timezone defaults to local system timezone except (ValueError, KeyError) as e: raise ValueError(f"Invalid cron expression '{cron_expression}': {str(e)}") # Add cron job job = self.scheduler.add_job( func=self._trigger_scheduled_scan, args=[schedule_id], trigger=trigger, id=f'schedule_{schedule_id}', name=f'Schedule {schedule_id}', replace_existing=True, max_instances=1 # Only one instance per schedule ) logger.info(f"Added scheduled scan {schedule_id} with cron '{cron_expression}' (job_id={job.id})") return job.id def remove_scheduled_scan(self, schedule_id: int): """ Remove a scheduled scan job. Args: schedule_id: Database ID of the schedule Raises: RuntimeError: If scheduler not initialized """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") job_id = f'schedule_{schedule_id}' try: self.scheduler.remove_job(job_id) logger.info(f"Removed scheduled scan job: {job_id}") except Exception as e: logger.warning(f"Failed to remove scheduled scan job {job_id}: {str(e)}") def _trigger_scheduled_scan(self, schedule_id: int): """ Internal method to trigger a scan from a schedule. Creates a new scan record and queues it for execution. Args: schedule_id: Database ID of the schedule """ logger.info(f"Scheduled scan triggered: schedule_id={schedule_id}") # Import here to avoid circular imports from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from web.services.schedule_service import ScheduleService from web.services.scan_service import ScanService try: # Create database session engine = create_engine(self.db_url) Session = sessionmaker(bind=engine) session = Session() try: # Get schedule details schedule_service = ScheduleService(session) schedule = schedule_service.get_schedule(schedule_id) if not schedule: logger.error(f"Schedule {schedule_id} not found") return if not schedule['enabled']: logger.warning(f"Schedule {schedule_id} is disabled, skipping execution") return # Create and trigger scan scan_service = ScanService(session) scan_id = scan_service.trigger_scan( config_file=schedule['config_file'], triggered_by='scheduled', schedule_id=schedule_id, scheduler=None # Don't pass scheduler to avoid recursion ) # Queue the scan for execution self.queue_scan(scan_id, schedule['config_file']) # Update schedule's last_run and next_run from croniter import croniter next_run = croniter(schedule['cron_expression'], datetime.utcnow()).get_next(datetime) schedule_service.update_run_times( schedule_id=schedule_id, last_run=datetime.utcnow(), next_run=next_run ) logger.info(f"Scheduled scan completed: schedule_id={schedule_id}, scan_id={scan_id}") finally: session.close() except Exception as e: logger.error(f"Error triggering scheduled scan {schedule_id}: {str(e)}", exc_info=True) def get_job_status(self, job_id: str) -> Optional[dict]: """ Get status of a scheduled job. Args: job_id: APScheduler job ID Returns: Dictionary with job information, or None if not found """ if not self.scheduler: return None job = self.scheduler.get_job(job_id) if not job: return None return { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'trigger': str(job.trigger) } def list_jobs(self) -> list: """ List all scheduled jobs. Returns: List of job information dictionaries """ if not self.scheduler: return [] jobs = self.scheduler.get_jobs() return [ { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'trigger': str(job.trigger) } for job in jobs ]