Files
SneakyScan/web/services/scheduler_service.py
Phillip Tarrant 9b88f42297 Phase 3 Step 6: Complete Scheduler Integration with Bug Fixes
Implemented complete scheduler integration with automatic schedule loading,
orphaned scan cleanup, and conversion to local timezone for better UX.

Backend Changes:
- Added load_schedules_on_startup() to load enabled schedules on app start
- Implemented cleanup_orphaned_scans() to handle crashed/interrupted scans
- Converted scheduler from UTC to local system timezone throughout
- Enhanced scheduler service with robust error handling and logging

Frontend Changes:
- Updated all schedule UI templates to display local time instead of UTC
- Improved timezone indicators and user messaging
- Removed confusing timezone converter (no longer needed)
- Updated quick templates and help text for local time

Bug Fixes:
- Fixed critical timezone bug causing cron expressions to run at wrong times
- Fixed orphaned scans stuck in 'running' status after system crashes
- Improved time display clarity across all schedule pages

All schedules now use local system time for intuitive scheduling.
2025-11-14 15:44:13 -06:00

357 lines
12 KiB
Python

"""
Scheduler service for managing background jobs and scheduled scans.
This service integrates APScheduler with Flask to enable background
scan execution and future scheduled scanning capabilities.
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from flask import Flask
from web.jobs.scan_job import execute_scan
logger = logging.getLogger(__name__)
class SchedulerService:
"""
Service for managing background job scheduling.
Uses APScheduler's BackgroundScheduler to run scans asynchronously
without blocking HTTP requests.
"""
def __init__(self):
"""Initialize scheduler service (scheduler not started yet)."""
self.scheduler: Optional[BackgroundScheduler] = None
self.db_url: Optional[str] = None
def init_scheduler(self, app: Flask):
"""
Initialize and start APScheduler with Flask app.
Args:
app: Flask application instance
Configuration:
- BackgroundScheduler: Runs in separate thread
- ThreadPoolExecutor: Allows concurrent scan execution
- Max workers: 3 (configurable via SCHEDULER_MAX_WORKERS)
"""
if self.scheduler:
logger.warning("Scheduler already initialized")
return
# Store database URL for passing to background jobs
self.db_url = app.config['SQLALCHEMY_DATABASE_URI']
# Configure executor for concurrent jobs
max_workers = app.config.get('SCHEDULER_MAX_WORKERS', 3)
executors = {
'default': ThreadPoolExecutor(max_workers=max_workers)
}
# Configure job defaults
job_defaults = {
'coalesce': True, # Combine multiple pending instances into one
'max_instances': app.config.get('SCHEDULER_MAX_INSTANCES', 3),
'misfire_grace_time': 60 # Allow 60 seconds for delayed starts
}
# Create scheduler with local system timezone
# This allows users to schedule jobs using their local time
# APScheduler will automatically use the system's local timezone
self.scheduler = BackgroundScheduler(
executors=executors,
job_defaults=job_defaults
# timezone defaults to local system timezone
)
# Start scheduler
self.scheduler.start()
logger.info(f"APScheduler started with {max_workers} max workers")
# Register shutdown handler
import atexit
atexit.register(lambda: self.shutdown())
def shutdown(self):
"""
Shutdown scheduler gracefully.
Waits for running jobs to complete before shutting down.
"""
if self.scheduler:
logger.info("Shutting down APScheduler...")
self.scheduler.shutdown(wait=True)
logger.info("APScheduler shutdown complete")
self.scheduler = None
def load_schedules_on_startup(self):
"""
Load all enabled schedules from database and register with APScheduler.
Should be called after init_scheduler() to restore scheduled jobs
that were active when the application last shutdown.
Raises:
RuntimeError: If scheduler not initialized
"""
if not self.scheduler:
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
# Import here to avoid circular imports
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from web.models import Schedule
try:
# Create database session
engine = create_engine(self.db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Query all enabled schedules
enabled_schedules = (
session.query(Schedule)
.filter(Schedule.enabled == True)
.all()
)
logger.info(f"Loading {len(enabled_schedules)} enabled schedules on startup")
# Register each schedule with APScheduler
for schedule in enabled_schedules:
try:
self.add_scheduled_scan(
schedule_id=schedule.id,
config_file=schedule.config_file,
cron_expression=schedule.cron_expression
)
logger.info(f"Loaded schedule {schedule.id}: '{schedule.name}'")
except Exception as e:
logger.error(
f"Failed to load schedule {schedule.id} ('{schedule.name}'): {str(e)}",
exc_info=True
)
logger.info("Schedule loading complete")
finally:
session.close()
except Exception as e:
logger.error(f"Error loading schedules on startup: {str(e)}", exc_info=True)
def queue_scan(self, scan_id: int, config_file: str) -> str:
"""
Queue a scan for immediate background execution.
Args:
scan_id: Database ID of the scan
config_file: Path to YAML configuration file
Returns:
Job ID from APScheduler
Raises:
RuntimeError: If scheduler not initialized
"""
if not self.scheduler:
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
# Add job to run immediately
job = self.scheduler.add_job(
func=execute_scan,
args=[scan_id, config_file, self.db_url],
id=f'scan_{scan_id}',
name=f'Scan {scan_id}',
replace_existing=True,
misfire_grace_time=300 # 5 minutes
)
logger.info(f"Queued scan {scan_id} for background execution (job_id={job.id})")
return job.id
def add_scheduled_scan(self, schedule_id: int, config_file: str,
cron_expression: str) -> str:
"""
Add a recurring scheduled scan.
Args:
schedule_id: Database ID of the schedule
config_file: Path to YAML configuration file
cron_expression: Cron expression (e.g., "0 2 * * *" for 2am daily)
Returns:
Job ID from APScheduler
Raises:
RuntimeError: If scheduler not initialized
ValueError: If cron expression is invalid
"""
if not self.scheduler:
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
from apscheduler.triggers.cron import CronTrigger
# Create cron trigger from expression using local timezone
# This allows users to specify times in their local timezone
try:
trigger = CronTrigger.from_crontab(cron_expression)
# timezone defaults to local system timezone
except (ValueError, KeyError) as e:
raise ValueError(f"Invalid cron expression '{cron_expression}': {str(e)}")
# Add cron job
job = self.scheduler.add_job(
func=self._trigger_scheduled_scan,
args=[schedule_id],
trigger=trigger,
id=f'schedule_{schedule_id}',
name=f'Schedule {schedule_id}',
replace_existing=True,
max_instances=1 # Only one instance per schedule
)
logger.info(f"Added scheduled scan {schedule_id} with cron '{cron_expression}' (job_id={job.id})")
return job.id
def remove_scheduled_scan(self, schedule_id: int):
"""
Remove a scheduled scan job.
Args:
schedule_id: Database ID of the schedule
Raises:
RuntimeError: If scheduler not initialized
"""
if not self.scheduler:
raise RuntimeError("Scheduler not initialized. Call init_scheduler() first.")
job_id = f'schedule_{schedule_id}'
try:
self.scheduler.remove_job(job_id)
logger.info(f"Removed scheduled scan job: {job_id}")
except Exception as e:
logger.warning(f"Failed to remove scheduled scan job {job_id}: {str(e)}")
def _trigger_scheduled_scan(self, schedule_id: int):
"""
Internal method to trigger a scan from a schedule.
Creates a new scan record and queues it for execution.
Args:
schedule_id: Database ID of the schedule
"""
logger.info(f"Scheduled scan triggered: schedule_id={schedule_id}")
# Import here to avoid circular imports
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from web.services.schedule_service import ScheduleService
from web.services.scan_service import ScanService
try:
# Create database session
engine = create_engine(self.db_url)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Get schedule details
schedule_service = ScheduleService(session)
schedule = schedule_service.get_schedule(schedule_id)
if not schedule:
logger.error(f"Schedule {schedule_id} not found")
return
if not schedule['enabled']:
logger.warning(f"Schedule {schedule_id} is disabled, skipping execution")
return
# Create and trigger scan
scan_service = ScanService(session)
scan_id = scan_service.trigger_scan(
config_file=schedule['config_file'],
triggered_by='scheduled',
schedule_id=schedule_id,
scheduler=None # Don't pass scheduler to avoid recursion
)
# Queue the scan for execution
self.queue_scan(scan_id, schedule['config_file'])
# Update schedule's last_run and next_run
from croniter import croniter
next_run = croniter(schedule['cron_expression'], datetime.utcnow()).get_next(datetime)
schedule_service.update_run_times(
schedule_id=schedule_id,
last_run=datetime.utcnow(),
next_run=next_run
)
logger.info(f"Scheduled scan completed: schedule_id={schedule_id}, scan_id={scan_id}")
finally:
session.close()
except Exception as e:
logger.error(f"Error triggering scheduled scan {schedule_id}: {str(e)}", exc_info=True)
def get_job_status(self, job_id: str) -> Optional[dict]:
"""
Get status of a scheduled job.
Args:
job_id: APScheduler job ID
Returns:
Dictionary with job information, or None if not found
"""
if not self.scheduler:
return None
job = self.scheduler.get_job(job_id)
if not job:
return None
return {
'id': job.id,
'name': job.name,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'trigger': str(job.trigger)
}
def list_jobs(self) -> list:
"""
List all scheduled jobs.
Returns:
List of job information dictionaries
"""
if not self.scheduler:
return []
jobs = self.scheduler.get_jobs()
return [
{
'id': job.id,
'name': job.name,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'trigger': str(job.trigger)
}
for job in jobs
]