""" Tests for background job execution and scheduler integration. Tests the APScheduler integration, job queuing, and background scan execution. """ import pytest import time from datetime import datetime from web.models import Scan from web.services.scan_service import ScanService from web.services.scheduler_service import SchedulerService class TestBackgroundJobs: """Test suite for background job execution.""" def test_scheduler_initialization(self, app): """Test that scheduler is initialized with Flask app.""" assert hasattr(app, 'scheduler') assert app.scheduler is not None assert app.scheduler.scheduler is not None assert app.scheduler.scheduler.running def test_queue_scan_job(self, app, db, sample_config_file): """Test queuing a scan for background execution.""" # Create a scan via service scan_service = ScanService(db) scan_id = scan_service.trigger_scan( config_file=sample_config_file, triggered_by='test', scheduler=app.scheduler ) # Verify scan was created scan = db.query(Scan).filter_by(id=scan_id).first() assert scan is not None assert scan.status == 'running' # Verify job was queued (check scheduler has the job) job = app.scheduler.scheduler.get_job(f'scan_{scan_id}') assert job is not None assert job.id == f'scan_{scan_id}' def test_trigger_scan_without_scheduler(self, db, sample_config_file): """Test triggering scan without scheduler logs warning.""" # Create scan without scheduler scan_service = ScanService(db) scan_id = scan_service.trigger_scan( config_file=sample_config_file, triggered_by='test', scheduler=None # No scheduler ) # Verify scan was created but not queued scan = db.query(Scan).filter_by(id=scan_id).first() assert scan is not None assert scan.status == 'running' def test_scheduler_service_queue_scan(self, app, db, sample_config_file): """Test SchedulerService.queue_scan directly.""" # Create scan record first scan = Scan( timestamp=datetime.utcnow(), status='running', config_file=sample_config_file, title='Test Scan', triggered_by='test' ) db.add(scan) db.commit() # Queue the scan job_id = app.scheduler.queue_scan(scan.id, sample_config_file) # Verify job was queued assert job_id == f'scan_{scan.id}' job = app.scheduler.scheduler.get_job(job_id) assert job is not None def test_scheduler_list_jobs(self, app, db, sample_config_file): """Test listing scheduled jobs.""" # Queue a few scans for i in range(3): scan = Scan( timestamp=datetime.utcnow(), status='running', config_file=sample_config_file, title=f'Test Scan {i}', triggered_by='test' ) db.add(scan) db.commit() app.scheduler.queue_scan(scan.id, sample_config_file) # List jobs jobs = app.scheduler.list_jobs() # Should have at least 3 jobs (might have more from other tests) assert len(jobs) >= 3 # Each job should have required fields for job in jobs: assert 'id' in job assert 'name' in job assert 'trigger' in job def test_scheduler_get_job_status(self, app, db, sample_config_file): """Test getting status of a specific job.""" # Create and queue a scan scan = Scan( timestamp=datetime.utcnow(), status='running', config_file=sample_config_file, title='Test Scan', triggered_by='test' ) db.add(scan) db.commit() job_id = app.scheduler.queue_scan(scan.id, sample_config_file) # Get job status status = app.scheduler.get_job_status(job_id) assert status is not None assert status['id'] == job_id assert status['name'] == f'Scan {scan.id}' def test_scheduler_get_nonexistent_job(self, app): """Test getting status of non-existent job.""" status = app.scheduler.get_job_status('nonexistent_job_id') assert status is None def test_scan_timing_fields(self, db, sample_config_file): """Test that scan timing fields are properly set.""" # Create scan with started_at scan = Scan( timestamp=datetime.utcnow(), status='running', config_file=sample_config_file, title='Test Scan', triggered_by='test', started_at=datetime.utcnow() ) db.add(scan) db.commit() # Verify fields exist assert scan.started_at is not None assert scan.completed_at is None assert scan.error_message is None # Update to completed scan.status = 'completed' scan.completed_at = datetime.utcnow() db.commit() # Verify fields updated assert scan.completed_at is not None assert (scan.completed_at - scan.started_at).total_seconds() >= 0 def test_scan_error_handling(self, db, sample_config_file): """Test that error messages are stored correctly.""" # Create failed scan scan = Scan( timestamp=datetime.utcnow(), status='failed', config_file=sample_config_file, title='Failed Scan', triggered_by='test', started_at=datetime.utcnow(), completed_at=datetime.utcnow(), error_message='Test error message' ) db.add(scan) db.commit() # Verify error message stored assert scan.error_message == 'Test error message' # Verify status query works scan_service = ScanService(db) status = scan_service.get_scan_status(scan.id) assert status['status'] == 'failed' assert status['error_message'] == 'Test error message' @pytest.mark.skip(reason="Requires actual scanner execution - slow test") def test_background_scan_execution(self, app, db, sample_config_file): """ Integration test for actual background scan execution. This test is skipped by default because it actually runs the scanner, which requires privileged operations and takes time. To run: pytest -v -k test_background_scan_execution --run-slow """ # Trigger scan scan_service = ScanService(db) scan_id = scan_service.trigger_scan( config_file=sample_config_file, triggered_by='test', scheduler=app.scheduler ) # Wait for scan to complete (with timeout) max_wait = 300 # 5 minutes start_time = time.time() while time.time() - start_time < max_wait: scan = db.query(Scan).filter_by(id=scan_id).first() if scan.status in ['completed', 'failed']: break time.sleep(5) # Verify scan completed scan = db.query(Scan).filter_by(id=scan_id).first() assert scan.status in ['completed', 'failed'] if scan.status == 'completed': assert scan.duration is not None assert scan.json_path is not None else: assert scan.error_message is not None