""" Scheduler service for managing background jobs and scheduled scans. This service integrates APScheduler with Flask to enable background scan execution and future scheduled scanning capabilities. """ import logging from datetime import datetime from typing import Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from flask import Flask from web.jobs.scan_job import execute_scan logger = logging.getLogger(__name__) class SchedulerService: """ Service for managing background job scheduling. Uses APScheduler's BackgroundScheduler to run scans asynchronously without blocking HTTP requests. """ def __init__(self): """Initialize scheduler service (scheduler not started yet).""" self.scheduler: Optional[BackgroundScheduler] = None self.db_url: Optional[str] = None def init_scheduler(self, app: Flask): """ Initialize and start APScheduler with Flask app. Args: app: Flask application instance Configuration: - BackgroundScheduler: Runs in separate thread - ThreadPoolExecutor: Allows concurrent scan execution - Max workers: 3 (configurable via SCHEDULER_MAX_WORKERS) """ if self.scheduler: logger.warning("Scheduler already initialized") return # Store database URL for passing to background jobs self.db_url = app.config['SQLALCHEMY_DATABASE_URI'] # Configure executor for concurrent jobs max_workers = app.config.get('SCHEDULER_MAX_WORKERS', 3) executors = { 'default': ThreadPoolExecutor(max_workers=max_workers) } # Configure job defaults job_defaults = { 'coalesce': True, # Combine multiple pending instances into one 'max_instances': app.config.get('SCHEDULER_MAX_INSTANCES', 3), 'misfire_grace_time': 60 # Allow 60 seconds for delayed starts } # Create scheduler self.scheduler = BackgroundScheduler( executors=executors, job_defaults=job_defaults, timezone='UTC' ) # Start scheduler self.scheduler.start() logger.info(f"APScheduler started with {max_workers} max workers") # Register shutdown handler import atexit atexit.register(lambda: self.shutdown()) def shutdown(self): """ Shutdown scheduler gracefully. Waits for running jobs to complete before shutting down. """ if self.scheduler: logger.info("Shutting down APScheduler...") self.scheduler.shutdown(wait=True) logger.info("APScheduler shutdown complete") self.scheduler = None def queue_scan(self, scan_id: int, config_file: str) -> str: """ Queue a scan for immediate background execution. Args: scan_id: Database ID of the scan config_file: Path to YAML configuration file Returns: Job ID from APScheduler Raises: RuntimeError: If scheduler not initialized """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") # Add job to run immediately job = self.scheduler.add_job( func=execute_scan, args=[scan_id, config_file, self.db_url], id=f'scan_{scan_id}', name=f'Scan {scan_id}', replace_existing=True, misfire_grace_time=300 # 5 minutes ) logger.info(f"Queued scan {scan_id} for background execution (job_id={job.id})") return job.id def add_scheduled_scan(self, schedule_id: int, config_file: str, cron_expression: str) -> str: """ Add a recurring scheduled scan. Args: schedule_id: Database ID of the schedule config_file: Path to YAML configuration file cron_expression: Cron expression (e.g., "0 2 * * *" for 2am daily) Returns: Job ID from APScheduler Raises: RuntimeError: If scheduler not initialized ValueError: If cron expression is invalid Note: This is a placeholder for Phase 3 scheduled scanning feature. Currently not used, but structure is in place. """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") # Parse cron expression # Format: "minute hour day month day_of_week" parts = cron_expression.split() if len(parts) != 5: raise ValueError(f"Invalid cron expression: {cron_expression}") minute, hour, day, month, day_of_week = parts # Add cron job (currently placeholder - will be enhanced in Phase 3) job = self.scheduler.add_job( func=self._trigger_scheduled_scan, args=[schedule_id, config_file], trigger='cron', minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week, id=f'schedule_{schedule_id}', name=f'Schedule {schedule_id}', replace_existing=True ) logger.info(f"Added scheduled scan {schedule_id} with cron '{cron_expression}' (job_id={job.id})") return job.id def remove_scheduled_scan(self, schedule_id: int): """ Remove a scheduled scan job. Args: schedule_id: Database ID of the schedule Raises: RuntimeError: If scheduler not initialized """ if not self.scheduler: raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.") job_id = f'schedule_{schedule_id}' try: self.scheduler.remove_job(job_id) logger.info(f"Removed scheduled scan job: {job_id}") except Exception as e: logger.warning(f"Failed to remove scheduled scan job {job_id}: {str(e)}") def _trigger_scheduled_scan(self, schedule_id: int, config_file: str): """ Internal method to trigger a scan from a schedule. Creates a new scan record and queues it for execution. Args: schedule_id: Database ID of the schedule config_file: Path to YAML configuration file Note: This will be fully implemented in Phase 3 when scheduled scanning is added. Currently a placeholder. """ logger.info(f"Scheduled scan triggered: schedule_id={schedule_id}") # TODO: In Phase 3, this will: # 1. Create a new Scan record with triggered_by='scheduled' # 2. Call queue_scan() with the new scan_id # 3. Update schedule's last_run and next_run timestamps def get_job_status(self, job_id: str) -> Optional[dict]: """ Get status of a scheduled job. Args: job_id: APScheduler job ID Returns: Dictionary with job information, or None if not found """ if not self.scheduler: return None job = self.scheduler.get_job(job_id) if not job: return None return { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'trigger': str(job.trigger) } def list_jobs(self) -> list: """ List all scheduled jobs. Returns: List of job information dictionaries """ if not self.scheduler: return [] jobs = self.scheduler.get_jobs() return [ { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'trigger': str(job.trigger) } for job in jobs ]