268 lines
9.0 KiB
Python
268 lines
9.0 KiB
Python
"""
|
|
Integration tests for Scan API endpoints.
|
|
|
|
Tests all scan management endpoints including triggering scans,
|
|
listing, retrieving details, deleting, and status polling.
|
|
"""
|
|
|
|
import json
|
|
import pytest
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from web.models import Scan
|
|
|
|
|
|
class TestScanAPIEndpoints:
|
|
"""Test suite for scan API endpoints."""
|
|
|
|
def test_list_scans_empty(self, client, db):
|
|
"""Test listing scans when database is empty."""
|
|
response = client.get('/api/scans')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['scans'] == []
|
|
assert data['total'] == 0
|
|
assert data['page'] == 1
|
|
assert data['per_page'] == 20
|
|
|
|
def test_list_scans_with_data(self, client, db, sample_scan):
|
|
"""Test listing scans with existing data."""
|
|
response = client.get('/api/scans')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 1
|
|
assert len(data['scans']) == 1
|
|
assert data['scans'][0]['id'] == sample_scan.id
|
|
|
|
def test_list_scans_pagination(self, client, db, sample_db_config):
|
|
"""Test scan list pagination."""
|
|
# Create 25 scans
|
|
for i in range(25):
|
|
scan = Scan(
|
|
timestamp=datetime.utcnow(),
|
|
status='completed',
|
|
config_id=sample_db_config.id,
|
|
title=f'Test Scan {i}',
|
|
triggered_by='test'
|
|
)
|
|
db.add(scan)
|
|
db.commit()
|
|
|
|
# Test page 1
|
|
response = client.get('/api/scans?page=1&per_page=10')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 25
|
|
assert len(data['scans']) == 10
|
|
assert data['page'] == 1
|
|
assert data['per_page'] == 10
|
|
assert data['total_pages'] == 3
|
|
assert data['has_next'] is True
|
|
assert data['has_prev'] is False
|
|
|
|
# Test page 2
|
|
response = client.get('/api/scans?page=2&per_page=10')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert len(data['scans']) == 10
|
|
assert data['page'] == 2
|
|
assert data['has_next'] is True
|
|
assert data['has_prev'] is True
|
|
|
|
def test_list_scans_status_filter(self, client, db):
|
|
"""Test filtering scans by status."""
|
|
# Create scans with different statuses
|
|
for status in ['running', 'completed', 'failed']:
|
|
scan = Scan(
|
|
timestamp=datetime.utcnow(),
|
|
status=status,
|
|
config_id=1,
|
|
title=f'{status.capitalize()} Scan',
|
|
triggered_by='test'
|
|
)
|
|
db.add(scan)
|
|
db.commit()
|
|
|
|
# Filter by completed
|
|
response = client.get('/api/scans?status=completed')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 1
|
|
assert data['scans'][0]['status'] == 'completed'
|
|
|
|
def test_list_scans_invalid_page(self, client, db):
|
|
"""Test listing scans with invalid page parameter."""
|
|
response = client.get('/api/scans?page=0')
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_get_scan_success(self, client, db, sample_scan):
|
|
"""Test retrieving a specific scan."""
|
|
response = client.get(f'/api/scans/{sample_scan.id}')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['id'] == sample_scan.id
|
|
assert data['title'] == sample_scan.title
|
|
assert data['status'] == sample_scan.status
|
|
|
|
def test_get_scan_not_found(self, client, db):
|
|
"""Test retrieving a non-existent scan."""
|
|
response = client.get('/api/scans/99999')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert data['error'] == 'Not found'
|
|
|
|
def test_trigger_scan_success(self, client, db, sample_db_config):
|
|
"""Test triggering a new scan."""
|
|
response = client.post('/api/scans',
|
|
json={'config_id': sample_db_config.id},
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
data = json.loads(response.data)
|
|
assert 'scan_id' in data
|
|
assert data['status'] == 'running'
|
|
assert data['message'] == 'Scan queued successfully'
|
|
|
|
# Verify scan was created in database
|
|
scan = db.query(Scan).filter_by(id=data['scan_id']).first()
|
|
assert scan is not None
|
|
assert scan.status == 'running'
|
|
assert scan.triggered_by == 'api'
|
|
|
|
def test_trigger_scan_missing_config_id(self, client, db):
|
|
"""Test triggering scan without config_id."""
|
|
response = client.post('/api/scans',
|
|
json={},
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'config_id is required' in data['message']
|
|
|
|
def test_trigger_scan_invalid_config_id(self, client, db):
|
|
"""Test triggering scan with non-existent config."""
|
|
response = client.post('/api/scans',
|
|
json={'config_id': 99999},
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_delete_scan_success(self, client, db, sample_scan):
|
|
"""Test deleting a scan."""
|
|
scan_id = sample_scan.id
|
|
|
|
response = client.delete(f'/api/scans/{scan_id}')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['scan_id'] == scan_id
|
|
assert 'deleted successfully' in data['message']
|
|
|
|
# Verify scan was deleted from database
|
|
scan = db.query(Scan).filter_by(id=scan_id).first()
|
|
assert scan is None
|
|
|
|
def test_delete_scan_not_found(self, client, db):
|
|
"""Test deleting a non-existent scan."""
|
|
response = client.delete('/api/scans/99999')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_get_scan_status_success(self, client, db, sample_scan):
|
|
"""Test getting scan status."""
|
|
response = client.get(f'/api/scans/{sample_scan.id}/status')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['scan_id'] == sample_scan.id
|
|
assert data['status'] == sample_scan.status
|
|
assert 'timestamp' in data
|
|
|
|
def test_get_scan_status_not_found(self, client, db):
|
|
"""Test getting status for non-existent scan."""
|
|
response = client.get('/api/scans/99999/status')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_api_error_handling(self, client, db):
|
|
"""Test API error responses are properly formatted."""
|
|
# Test 404
|
|
response = client.get('/api/scans/99999')
|
|
assert response.status_code == 404
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'message' in data
|
|
|
|
# Test 400
|
|
response = client.post('/api/scans', json={})
|
|
assert response.status_code == 400
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'message' in data
|
|
|
|
def test_scan_workflow_integration(self, client, db, sample_db_config):
|
|
"""
|
|
Test complete scan workflow: trigger → status → retrieve → delete.
|
|
|
|
This integration test verifies the entire scan lifecycle through
|
|
the API endpoints.
|
|
"""
|
|
# Step 1: Trigger scan
|
|
response = client.post('/api/scans',
|
|
json={'config_id': sample_db_config.id},
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201
|
|
data = json.loads(response.data)
|
|
scan_id = data['scan_id']
|
|
|
|
# Step 2: Check status
|
|
response = client.get(f'/api/scans/{scan_id}/status')
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data)
|
|
assert data['scan_id'] == scan_id
|
|
assert data['status'] == 'running'
|
|
|
|
# Step 3: List scans (verify it appears)
|
|
response = client.get('/api/scans')
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 1
|
|
assert data['scans'][0]['id'] == scan_id
|
|
|
|
# Step 4: Get scan details
|
|
response = client.get(f'/api/scans/{scan_id}')
|
|
assert response.status_code == 200
|
|
data = json.loads(response.data)
|
|
assert data['id'] == scan_id
|
|
|
|
# Step 5: Delete scan
|
|
response = client.delete(f'/api/scans/{scan_id}')
|
|
assert response.status_code == 200
|
|
|
|
# Step 6: Verify deletion
|
|
response = client.get(f'/api/scans/{scan_id}')
|
|
assert response.status_code == 404
|