""" Integration tests for Scan API endpoints. Tests all scan management endpoints including triggering scans, listing, retrieving details, deleting, and status polling. """ import json import pytest from pathlib import Path from datetime import datetime from web.models import Scan class TestScanAPIEndpoints: """Test suite for scan API endpoints.""" def test_list_scans_empty(self, client, db): """Test listing scans when database is empty.""" response = client.get('/api/scans') assert response.status_code == 200 data = json.loads(response.data) assert data['scans'] == [] assert data['total'] == 0 assert data['page'] == 1 assert data['per_page'] == 20 def test_list_scans_with_data(self, client, db, sample_scan): """Test listing scans with existing data.""" response = client.get('/api/scans') assert response.status_code == 200 data = json.loads(response.data) assert data['total'] == 1 assert len(data['scans']) == 1 assert data['scans'][0]['id'] == sample_scan.id def test_list_scans_pagination(self, client, db): """Test scan list pagination.""" # Create 25 scans for i in range(25): scan = Scan( timestamp=datetime.utcnow(), status='completed', config_file=f'/app/configs/test{i}.yaml', title=f'Test Scan {i}', triggered_by='test' ) db.add(scan) db.commit() # Test page 1 response = client.get('/api/scans?page=1&per_page=10') assert response.status_code == 200 data = json.loads(response.data) assert data['total'] == 25 assert len(data['scans']) == 10 assert data['page'] == 1 assert data['per_page'] == 10 assert data['total_pages'] == 3 assert data['has_next'] is True assert data['has_prev'] is False # Test page 2 response = client.get('/api/scans?page=2&per_page=10') assert response.status_code == 200 data = json.loads(response.data) assert len(data['scans']) == 10 assert data['page'] == 2 assert data['has_next'] is True assert data['has_prev'] is True def test_list_scans_status_filter(self, client, db): """Test filtering scans by status.""" # Create scans with different statuses for status in ['running', 'completed', 'failed']: scan = Scan( timestamp=datetime.utcnow(), status=status, config_file='/app/configs/test.yaml', title=f'{status.capitalize()} Scan', triggered_by='test' ) db.add(scan) db.commit() # Filter by completed response = client.get('/api/scans?status=completed') assert response.status_code == 200 data = json.loads(response.data) assert data['total'] == 1 assert data['scans'][0]['status'] == 'completed' def test_list_scans_invalid_page(self, client, db): """Test listing scans with invalid page parameter.""" response = client.get('/api/scans?page=0') assert response.status_code == 400 data = json.loads(response.data) assert 'error' in data def test_get_scan_success(self, client, db, sample_scan): """Test retrieving a specific scan.""" response = client.get(f'/api/scans/{sample_scan.id}') assert response.status_code == 200 data = json.loads(response.data) assert data['id'] == sample_scan.id assert data['title'] == sample_scan.title assert data['status'] == sample_scan.status def test_get_scan_not_found(self, client, db): """Test retrieving a non-existent scan.""" response = client.get('/api/scans/99999') assert response.status_code == 404 data = json.loads(response.data) assert 'error' in data assert data['error'] == 'Not found' def test_trigger_scan_success(self, client, db, sample_config_file): """Test triggering a new scan.""" response = client.post('/api/scans', json={'config_file': str(sample_config_file)}, content_type='application/json' ) assert response.status_code == 201 data = json.loads(response.data) assert 'scan_id' in data assert data['status'] == 'running' assert data['message'] == 'Scan queued successfully' # Verify scan was created in database scan = db.query(Scan).filter_by(id=data['scan_id']).first() assert scan is not None assert scan.status == 'running' assert scan.triggered_by == 'api' def test_trigger_scan_missing_config_file(self, client, db): """Test triggering scan without config_file.""" response = client.post('/api/scans', json={}, content_type='application/json' ) assert response.status_code == 400 data = json.loads(response.data) assert 'error' in data assert 'config_file is required' in data['message'] def test_trigger_scan_invalid_config_file(self, client, db): """Test triggering scan with non-existent config file.""" response = client.post('/api/scans', json={'config_file': '/nonexistent/config.yaml'}, content_type='application/json' ) assert response.status_code == 400 data = json.loads(response.data) assert 'error' in data def test_delete_scan_success(self, client, db, sample_scan): """Test deleting a scan.""" scan_id = sample_scan.id response = client.delete(f'/api/scans/{scan_id}') assert response.status_code == 200 data = json.loads(response.data) assert data['scan_id'] == scan_id assert 'deleted successfully' in data['message'] # Verify scan was deleted from database scan = db.query(Scan).filter_by(id=scan_id).first() assert scan is None def test_delete_scan_not_found(self, client, db): """Test deleting a non-existent scan.""" response = client.delete('/api/scans/99999') assert response.status_code == 404 data = json.loads(response.data) assert 'error' in data def test_get_scan_status_success(self, client, db, sample_scan): """Test getting scan status.""" response = client.get(f'/api/scans/{sample_scan.id}/status') assert response.status_code == 200 data = json.loads(response.data) assert data['scan_id'] == sample_scan.id assert data['status'] == sample_scan.status assert 'timestamp' in data def test_get_scan_status_not_found(self, client, db): """Test getting status for non-existent scan.""" response = client.get('/api/scans/99999/status') assert response.status_code == 404 data = json.loads(response.data) assert 'error' in data def test_api_error_handling(self, client, db): """Test API error responses are properly formatted.""" # Test 404 response = client.get('/api/scans/99999') assert response.status_code == 404 data = json.loads(response.data) assert 'error' in data assert 'message' in data # Test 400 response = client.post('/api/scans', json={}) assert response.status_code == 400 data = json.loads(response.data) assert 'error' in data assert 'message' in data def test_scan_workflow_integration(self, client, db, sample_config_file): """ Test complete scan workflow: trigger → status → retrieve → delete. This integration test verifies the entire scan lifecycle through the API endpoints. """ # Step 1: Trigger scan response = client.post('/api/scans', json={'config_file': str(sample_config_file)}, content_type='application/json' ) assert response.status_code == 201 data = json.loads(response.data) scan_id = data['scan_id'] # Step 2: Check status response = client.get(f'/api/scans/{scan_id}/status') assert response.status_code == 200 data = json.loads(response.data) assert data['scan_id'] == scan_id assert data['status'] == 'running' # Step 3: List scans (verify it appears) response = client.get('/api/scans') assert response.status_code == 200 data = json.loads(response.data) assert data['total'] == 1 assert data['scans'][0]['id'] == scan_id # Step 4: Get scan details response = client.get(f'/api/scans/{scan_id}') assert response.status_code == 200 data = json.loads(response.data) assert data['id'] == scan_id # Step 5: Delete scan response = client.delete(f'/api/scans/{scan_id}') assert response.status_code == 200 # Step 6: Verify deletion response = client.get(f'/api/scans/{scan_id}') assert response.status_code == 404