357 lines
12 KiB
Python
357 lines
12 KiB
Python
"""
|
|
Scheduler service for managing background jobs and scheduled scans.
|
|
|
|
This service integrates APScheduler with Flask to enable background
|
|
scan execution and future scheduled scanning capabilities.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
from flask import Flask
|
|
|
|
from web.jobs.scan_job import execute_scan
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SchedulerService:
|
|
"""
|
|
Service for managing background job scheduling.
|
|
|
|
Uses APScheduler's BackgroundScheduler to run scans asynchronously
|
|
without blocking HTTP requests.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize scheduler service (scheduler not started yet)."""
|
|
self.scheduler: Optional[BackgroundScheduler] = None
|
|
self.db_url: Optional[str] = None
|
|
|
|
def init_scheduler(self, app: Flask):
|
|
"""
|
|
Initialize and start APScheduler with Flask app.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
|
|
Configuration:
|
|
- BackgroundScheduler: Runs in separate thread
|
|
- ThreadPoolExecutor: Allows concurrent scan execution
|
|
- Max workers: 3 (configurable via SCHEDULER_MAX_WORKERS)
|
|
"""
|
|
if self.scheduler:
|
|
logger.warning("Scheduler already initialized")
|
|
return
|
|
|
|
# Store database URL for passing to background jobs
|
|
self.db_url = app.config['SQLALCHEMY_DATABASE_URI']
|
|
|
|
# Configure executor for concurrent jobs
|
|
max_workers = app.config.get('SCHEDULER_MAX_WORKERS', 3)
|
|
executors = {
|
|
'default': ThreadPoolExecutor(max_workers=max_workers)
|
|
}
|
|
|
|
# Configure job defaults
|
|
job_defaults = {
|
|
'coalesce': True, # Combine multiple pending instances into one
|
|
'max_instances': app.config.get('SCHEDULER_MAX_INSTANCES', 3),
|
|
'misfire_grace_time': 60 # Allow 60 seconds for delayed starts
|
|
}
|
|
|
|
# Create scheduler with local system timezone
|
|
# This allows users to schedule jobs using their local time
|
|
# APScheduler will automatically use the system's local timezone
|
|
self.scheduler = BackgroundScheduler(
|
|
executors=executors,
|
|
job_defaults=job_defaults
|
|
# timezone defaults to local system timezone
|
|
)
|
|
|
|
# Start scheduler
|
|
self.scheduler.start()
|
|
logger.info(f"APScheduler started with {max_workers} max workers")
|
|
|
|
# Register shutdown handler
|
|
import atexit
|
|
atexit.register(lambda: self.shutdown())
|
|
|
|
def shutdown(self):
|
|
"""
|
|
Shutdown scheduler gracefully.
|
|
|
|
Waits for running jobs to complete before shutting down.
|
|
"""
|
|
if self.scheduler:
|
|
logger.info("Shutting down APScheduler...")
|
|
self.scheduler.shutdown(wait=True)
|
|
logger.info("APScheduler shutdown complete")
|
|
self.scheduler = None
|
|
|
|
def load_schedules_on_startup(self):
|
|
"""
|
|
Load all enabled schedules from database and register with APScheduler.
|
|
|
|
Should be called after init_scheduler() to restore scheduled jobs
|
|
that were active when the application last shutdown.
|
|
|
|
Raises:
|
|
RuntimeError: If scheduler not initialized
|
|
"""
|
|
if not self.scheduler:
|
|
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
|
|
|
|
# Import here to avoid circular imports
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from web.models import Schedule
|
|
|
|
try:
|
|
# Create database session
|
|
engine = create_engine(self.db_url)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
# Query all enabled schedules
|
|
enabled_schedules = (
|
|
session.query(Schedule)
|
|
.filter(Schedule.enabled == True)
|
|
.all()
|
|
)
|
|
|
|
logger.info(f"Loading {len(enabled_schedules)} enabled schedules on startup")
|
|
|
|
# Register each schedule with APScheduler
|
|
for schedule in enabled_schedules:
|
|
try:
|
|
self.add_scheduled_scan(
|
|
schedule_id=schedule.id,
|
|
config_id=schedule.config_id,
|
|
cron_expression=schedule.cron_expression
|
|
)
|
|
logger.info(f"Loaded schedule {schedule.id}: '{schedule.name}'")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to load schedule {schedule.id} ('{schedule.name}'): {str(e)}",
|
|
exc_info=True
|
|
)
|
|
|
|
logger.info("Schedule loading complete")
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading schedules on startup: {str(e)}", exc_info=True)
|
|
|
|
def queue_scan(self, scan_id: int, config_id: int) -> str:
|
|
"""
|
|
Queue a scan for immediate background execution.
|
|
|
|
Args:
|
|
scan_id: Database ID of the scan
|
|
config_id: Database config ID
|
|
|
|
Returns:
|
|
Job ID from APScheduler
|
|
|
|
Raises:
|
|
RuntimeError: If scheduler not initialized
|
|
"""
|
|
if not self.scheduler:
|
|
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
|
|
|
|
# Add job to run immediately
|
|
job = self.scheduler.add_job(
|
|
func=execute_scan,
|
|
kwargs={'scan_id': scan_id, 'config_id': config_id, 'db_url': self.db_url},
|
|
id=f'scan_{scan_id}',
|
|
name=f'Scan {scan_id}',
|
|
replace_existing=True,
|
|
misfire_grace_time=300 # 5 minutes
|
|
)
|
|
|
|
logger.info(f"Queued scan {scan_id} for background execution (job_id={job.id})")
|
|
return job.id
|
|
|
|
def add_scheduled_scan(self, schedule_id: int, config_id: int,
|
|
cron_expression: str) -> str:
|
|
"""
|
|
Add a recurring scheduled scan.
|
|
|
|
Args:
|
|
schedule_id: Database ID of the schedule
|
|
config_id: Database config ID
|
|
cron_expression: Cron expression (e.g., "0 2 * * *" for 2am daily)
|
|
|
|
Returns:
|
|
Job ID from APScheduler
|
|
|
|
Raises:
|
|
RuntimeError: If scheduler not initialized
|
|
ValueError: If cron expression is invalid
|
|
"""
|
|
if not self.scheduler:
|
|
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
|
|
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
# Create cron trigger from expression using local timezone
|
|
# This allows users to specify times in their local timezone
|
|
try:
|
|
trigger = CronTrigger.from_crontab(cron_expression)
|
|
# timezone defaults to local system timezone
|
|
except (ValueError, KeyError) as e:
|
|
raise ValueError(f"Invalid cron expression '{cron_expression}': {str(e)}")
|
|
|
|
# Add cron job
|
|
job = self.scheduler.add_job(
|
|
func=self._trigger_scheduled_scan,
|
|
args=[schedule_id],
|
|
trigger=trigger,
|
|
id=f'schedule_{schedule_id}',
|
|
name=f'Schedule {schedule_id}',
|
|
replace_existing=True,
|
|
max_instances=1 # Only one instance per schedule
|
|
)
|
|
|
|
logger.info(f"Added scheduled scan {schedule_id} with cron '{cron_expression}' (job_id={job.id})")
|
|
return job.id
|
|
|
|
def remove_scheduled_scan(self, schedule_id: int):
|
|
"""
|
|
Remove a scheduled scan job.
|
|
|
|
Args:
|
|
schedule_id: Database ID of the schedule
|
|
|
|
Raises:
|
|
RuntimeError: If scheduler not initialized
|
|
"""
|
|
if not self.scheduler:
|
|
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
|
|
|
|
job_id = f'schedule_{schedule_id}'
|
|
|
|
try:
|
|
self.scheduler.remove_job(job_id)
|
|
logger.info(f"Removed scheduled scan job: {job_id}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to remove scheduled scan job {job_id}: {str(e)}")
|
|
|
|
def _trigger_scheduled_scan(self, schedule_id: int):
|
|
"""
|
|
Internal method to trigger a scan from a schedule.
|
|
|
|
Creates a new scan record and queues it for execution.
|
|
|
|
Args:
|
|
schedule_id: Database ID of the schedule
|
|
"""
|
|
logger.info(f"Scheduled scan triggered: schedule_id={schedule_id}")
|
|
|
|
# Import here to avoid circular imports
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from web.services.schedule_service import ScheduleService
|
|
from web.services.scan_service import ScanService
|
|
|
|
try:
|
|
# Create database session
|
|
engine = create_engine(self.db_url)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
# Get schedule details
|
|
schedule_service = ScheduleService(session)
|
|
schedule = schedule_service.get_schedule(schedule_id)
|
|
|
|
if not schedule:
|
|
logger.error(f"Schedule {schedule_id} not found")
|
|
return
|
|
|
|
if not schedule['enabled']:
|
|
logger.warning(f"Schedule {schedule_id} is disabled, skipping execution")
|
|
return
|
|
|
|
# Create and trigger scan
|
|
scan_service = ScanService(session)
|
|
scan_id = scan_service.trigger_scan(
|
|
config_id=schedule['config_id'],
|
|
triggered_by='scheduled',
|
|
schedule_id=schedule_id,
|
|
scheduler=None # Don't pass scheduler to avoid recursion
|
|
)
|
|
|
|
# Queue the scan for execution
|
|
self.queue_scan(scan_id, schedule['config_id'])
|
|
|
|
# Update schedule's last_run and next_run
|
|
from croniter import croniter
|
|
next_run = croniter(schedule['cron_expression'], datetime.utcnow()).get_next(datetime)
|
|
|
|
schedule_service.update_run_times(
|
|
schedule_id=schedule_id,
|
|
last_run=datetime.utcnow(),
|
|
next_run=next_run
|
|
)
|
|
|
|
logger.info(f"Scheduled scan completed: schedule_id={schedule_id}, scan_id={scan_id}")
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error triggering scheduled scan {schedule_id}: {str(e)}", exc_info=True)
|
|
|
|
def get_job_status(self, job_id: str) -> Optional[dict]:
|
|
"""
|
|
Get status of a scheduled job.
|
|
|
|
Args:
|
|
job_id: APScheduler job ID
|
|
|
|
Returns:
|
|
Dictionary with job information, or None if not found
|
|
"""
|
|
if not self.scheduler:
|
|
return None
|
|
|
|
job = self.scheduler.get_job(job_id)
|
|
if not job:
|
|
return None
|
|
|
|
return {
|
|
'id': job.id,
|
|
'name': job.name,
|
|
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
|
|
'trigger': str(job.trigger)
|
|
}
|
|
|
|
def list_jobs(self) -> list:
|
|
"""
|
|
List all scheduled jobs.
|
|
|
|
Returns:
|
|
List of job information dictionaries
|
|
"""
|
|
if not self.scheduler:
|
|
return []
|
|
|
|
jobs = self.scheduler.get_jobs()
|
|
return [
|
|
{
|
|
'id': job.id,
|
|
'name': job.name,
|
|
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
|
|
'trigger': str(job.trigger)
|
|
}
|
|
for job in jobs
|
|
]
|