Files
SneakyScan/app/tests/test_scan_api.py

268 lines
9.1 KiB
Python

"""
Integration tests for Scan API endpoints.
Tests all scan management endpoints including triggering scans,
listing, retrieving details, deleting, and status polling.
"""
import json
import pytest
from pathlib import Path
from datetime import datetime
from web.models import Scan
class TestScanAPIEndpoints:
"""Test suite for scan API endpoints."""
def test_list_scans_empty(self, client, db):
"""Test listing scans when database is empty."""
response = client.get('/api/scans')
assert response.status_code == 200
data = json.loads(response.data)
assert data['scans'] == []
assert data['total'] == 0
assert data['page'] == 1
assert data['per_page'] == 20
def test_list_scans_with_data(self, client, db, sample_scan):
"""Test listing scans with existing data."""
response = client.get('/api/scans')
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 1
assert len(data['scans']) == 1
assert data['scans'][0]['id'] == sample_scan.id
def test_list_scans_pagination(self, client, db):
"""Test scan list pagination."""
# Create 25 scans
for i in range(25):
scan = Scan(
timestamp=datetime.utcnow(),
status='completed',
config_file=f'/app/configs/test{i}.yaml',
title=f'Test Scan {i}',
triggered_by='test'
)
db.add(scan)
db.commit()
# Test page 1
response = client.get('/api/scans?page=1&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 25
assert len(data['scans']) == 10
assert data['page'] == 1
assert data['per_page'] == 10
assert data['total_pages'] == 3
assert data['has_next'] is True
assert data['has_prev'] is False
# Test page 2
response = client.get('/api/scans?page=2&per_page=10')
assert response.status_code == 200
data = json.loads(response.data)
assert len(data['scans']) == 10
assert data['page'] == 2
assert data['has_next'] is True
assert data['has_prev'] is True
def test_list_scans_status_filter(self, client, db):
"""Test filtering scans by status."""
# Create scans with different statuses
for status in ['running', 'completed', 'failed']:
scan = Scan(
timestamp=datetime.utcnow(),
status=status,
config_file='/app/configs/test.yaml',
title=f'{status.capitalize()} Scan',
triggered_by='test'
)
db.add(scan)
db.commit()
# Filter by completed
response = client.get('/api/scans?status=completed')
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 1
assert data['scans'][0]['status'] == 'completed'
def test_list_scans_invalid_page(self, client, db):
"""Test listing scans with invalid page parameter."""
response = client.get('/api/scans?page=0')
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_get_scan_success(self, client, db, sample_scan):
"""Test retrieving a specific scan."""
response = client.get(f'/api/scans/{sample_scan.id}')
assert response.status_code == 200
data = json.loads(response.data)
assert data['id'] == sample_scan.id
assert data['title'] == sample_scan.title
assert data['status'] == sample_scan.status
def test_get_scan_not_found(self, client, db):
"""Test retrieving a non-existent scan."""
response = client.get('/api/scans/99999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert data['error'] == 'Not found'
def test_trigger_scan_success(self, client, db, sample_config_file):
"""Test triggering a new scan."""
response = client.post('/api/scans',
json={'config_file': str(sample_config_file)},
content_type='application/json'
)
assert response.status_code == 201
data = json.loads(response.data)
assert 'scan_id' in data
assert data['status'] == 'running'
assert data['message'] == 'Scan queued successfully'
# Verify scan was created in database
scan = db.query(Scan).filter_by(id=data['scan_id']).first()
assert scan is not None
assert scan.status == 'running'
assert scan.triggered_by == 'api'
def test_trigger_scan_missing_config_file(self, client, db):
"""Test triggering scan without config_file."""
response = client.post('/api/scans',
json={},
content_type='application/json'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'config_file is required' in data['message']
def test_trigger_scan_invalid_config_file(self, client, db):
"""Test triggering scan with non-existent config file."""
response = client.post('/api/scans',
json={'config_file': '/nonexistent/config.yaml'},
content_type='application/json'
)
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
def test_delete_scan_success(self, client, db, sample_scan):
"""Test deleting a scan."""
scan_id = sample_scan.id
response = client.delete(f'/api/scans/{scan_id}')
assert response.status_code == 200
data = json.loads(response.data)
assert data['scan_id'] == scan_id
assert 'deleted successfully' in data['message']
# Verify scan was deleted from database
scan = db.query(Scan).filter_by(id=scan_id).first()
assert scan is None
def test_delete_scan_not_found(self, client, db):
"""Test deleting a non-existent scan."""
response = client.delete('/api/scans/99999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_get_scan_status_success(self, client, db, sample_scan):
"""Test getting scan status."""
response = client.get(f'/api/scans/{sample_scan.id}/status')
assert response.status_code == 200
data = json.loads(response.data)
assert data['scan_id'] == sample_scan.id
assert data['status'] == sample_scan.status
assert 'timestamp' in data
def test_get_scan_status_not_found(self, client, db):
"""Test getting status for non-existent scan."""
response = client.get('/api/scans/99999/status')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
def test_api_error_handling(self, client, db):
"""Test API error responses are properly formatted."""
# Test 404
response = client.get('/api/scans/99999')
assert response.status_code == 404
data = json.loads(response.data)
assert 'error' in data
assert 'message' in data
# Test 400
response = client.post('/api/scans', json={})
assert response.status_code == 400
data = json.loads(response.data)
assert 'error' in data
assert 'message' in data
def test_scan_workflow_integration(self, client, db, sample_config_file):
"""
Test complete scan workflow: trigger → status → retrieve → delete.
This integration test verifies the entire scan lifecycle through
the API endpoints.
"""
# Step 1: Trigger scan
response = client.post('/api/scans',
json={'config_file': str(sample_config_file)},
content_type='application/json'
)
assert response.status_code == 201
data = json.loads(response.data)
scan_id = data['scan_id']
# Step 2: Check status
response = client.get(f'/api/scans/{scan_id}/status')
assert response.status_code == 200
data = json.loads(response.data)
assert data['scan_id'] == scan_id
assert data['status'] == 'running'
# Step 3: List scans (verify it appears)
response = client.get('/api/scans')
assert response.status_code == 200
data = json.loads(response.data)
assert data['total'] == 1
assert data['scans'][0]['id'] == scan_id
# Step 4: Get scan details
response = client.get(f'/api/scans/{scan_id}')
assert response.status_code == 200
data = json.loads(response.data)
assert data['id'] == scan_id
# Step 5: Delete scan
response = client.delete(f'/api/scans/{scan_id}')
assert response.status_code == 200
# Step 6: Verify deletion
response = client.get(f'/api/scans/{scan_id}')
assert response.status_code == 404