484 lines
17 KiB
Python
484 lines
17 KiB
Python
"""
|
|
Integration tests for Config API endpoints.
|
|
|
|
Tests all config API endpoints including CSV/YAML upload, listing, downloading,
|
|
and deletion with schedule protection.
|
|
"""
|
|
|
|
import pytest
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
from web.app import create_app
|
|
from web.models import Base
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
|
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
"""Create test application"""
|
|
# Create temporary database
|
|
test_db = tempfile.mktemp(suffix='.db')
|
|
|
|
# Create temporary configs directory
|
|
temp_configs_dir = tempfile.mkdtemp()
|
|
|
|
app = create_app({
|
|
'TESTING': True,
|
|
'SQLALCHEMY_DATABASE_URI': f'sqlite:///{test_db}',
|
|
'SECRET_KEY': 'test-secret-key',
|
|
'WTF_CSRF_ENABLED': False,
|
|
})
|
|
|
|
# Override configs directory in ConfigService
|
|
os.environ['CONFIGS_DIR'] = temp_configs_dir
|
|
|
|
# Create tables
|
|
with app.app_context():
|
|
Base.metadata.create_all(bind=app.db_session.get_bind())
|
|
|
|
yield app
|
|
|
|
# Cleanup
|
|
os.unlink(test_db)
|
|
shutil.rmtree(temp_configs_dir)
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Create test client"""
|
|
return app.test_client()
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_headers(client):
|
|
"""Get authentication headers"""
|
|
# First register and login a user
|
|
from web.auth.models import User
|
|
|
|
with client.application.app_context():
|
|
# Create test user
|
|
user = User(username='testuser')
|
|
user.set_password('testpass')
|
|
client.application.db_session.add(user)
|
|
client.application.db_session.commit()
|
|
|
|
# Login
|
|
response = client.post('/auth/login', data={
|
|
'username': 'testuser',
|
|
'password': 'testpass'
|
|
}, follow_redirects=True)
|
|
|
|
assert response.status_code == 200
|
|
|
|
# Return empty headers (session-based auth)
|
|
return {}
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_csv():
|
|
"""Sample CSV content"""
|
|
return """scan_title,site_name,ip_address,ping_expected,tcp_ports,udp_ports,services
|
|
Test Scan,Web Servers,10.10.20.4,true,"22,80,443",53,"ssh,http,https"
|
|
Test Scan,Web Servers,10.10.20.5,true,22,,"ssh"
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_yaml():
|
|
"""Sample YAML content"""
|
|
return """title: Test Scan
|
|
sites:
|
|
- name: Web Servers
|
|
ips:
|
|
- address: 10.10.20.4
|
|
expected:
|
|
ping: true
|
|
tcp_ports: [22, 80, 443]
|
|
udp_ports: [53]
|
|
services: [ssh, http, https]
|
|
"""
|
|
|
|
|
|
class TestListConfigs:
|
|
"""Tests for GET /api/configs"""
|
|
|
|
def test_list_configs_empty(self, client, auth_headers):
|
|
"""Test listing configs when none exist"""
|
|
response = client.get('/api/configs', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert 'configs' in data
|
|
assert data['configs'] == []
|
|
|
|
def test_list_configs_with_files(self, client, auth_headers, app, sample_yaml):
|
|
"""Test listing configs with existing files"""
|
|
# Create a config file
|
|
temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs')
|
|
config_path = os.path.join(temp_configs_dir, 'test-scan.yaml')
|
|
with open(config_path, 'w') as f:
|
|
f.write(sample_yaml)
|
|
|
|
response = client.get('/api/configs', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert len(data['configs']) == 1
|
|
assert data['configs'][0]['filename'] == 'test-scan.yaml'
|
|
assert data['configs'][0]['title'] == 'Test Scan'
|
|
assert 'created_at' in data['configs'][0]
|
|
assert 'size_bytes' in data['configs'][0]
|
|
assert 'used_by_schedules' in data['configs'][0]
|
|
|
|
def test_list_configs_requires_auth(self, client):
|
|
"""Test that listing configs requires authentication"""
|
|
response = client.get('/api/configs')
|
|
assert response.status_code in [401, 302] # Unauthorized or redirect
|
|
|
|
|
|
class TestGetConfig:
|
|
"""Tests for GET /api/configs/<filename>"""
|
|
|
|
def test_get_config_valid(self, client, auth_headers, app, sample_yaml):
|
|
"""Test getting a valid config file"""
|
|
# Create a config file
|
|
temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs')
|
|
config_path = os.path.join(temp_configs_dir, 'test-scan.yaml')
|
|
with open(config_path, 'w') as f:
|
|
f.write(sample_yaml)
|
|
|
|
response = client.get('/api/configs/test-scan.yaml', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert data['filename'] == 'test-scan.yaml'
|
|
assert 'content' in data
|
|
assert 'parsed' in data
|
|
assert data['parsed']['title'] == 'Test Scan'
|
|
|
|
def test_get_config_not_found(self, client, auth_headers):
|
|
"""Test getting non-existent config"""
|
|
response = client.get('/api/configs/nonexistent.yaml', headers=auth_headers)
|
|
assert response.status_code == 404
|
|
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_get_config_requires_auth(self, client):
|
|
"""Test that getting config requires authentication"""
|
|
response = client.get('/api/configs/test.yaml')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestUploadCSV:
|
|
"""Tests for POST /api/configs/upload-csv"""
|
|
|
|
def test_upload_csv_valid(self, client, auth_headers, sample_csv):
|
|
"""Test uploading valid CSV"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 200
|
|
|
|
result = response.get_json()
|
|
assert result['success'] is True
|
|
assert 'filename' in result
|
|
assert result['filename'].endswith('.yaml')
|
|
assert 'preview' in result
|
|
|
|
def test_upload_csv_no_file(self, client, auth_headers):
|
|
"""Test uploading without file"""
|
|
response = client.post('/api/configs/upload-csv', data={},
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_upload_csv_invalid_format(self, client, auth_headers):
|
|
"""Test uploading invalid CSV"""
|
|
from io import BytesIO
|
|
|
|
invalid_csv = "not,a,valid,csv\nmissing,columns"
|
|
|
|
data = {
|
|
'file': (BytesIO(invalid_csv.encode('utf-8')), 'test.csv')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
result = response.get_json()
|
|
assert 'error' in result
|
|
|
|
def test_upload_csv_wrong_extension(self, client, auth_headers):
|
|
"""Test uploading file with wrong extension"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(b'test'), 'test.txt')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
def test_upload_csv_duplicate_filename(self, client, auth_headers, sample_csv):
|
|
"""Test uploading CSV that generates duplicate filename"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv')
|
|
}
|
|
|
|
# Upload first time
|
|
response1 = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response1.status_code == 200
|
|
|
|
# Upload second time (should fail)
|
|
response2 = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response2.status_code == 400
|
|
|
|
def test_upload_csv_requires_auth(self, client, sample_csv):
|
|
"""Test that uploading CSV requires authentication"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-csv', data=data,
|
|
content_type='multipart/form-data')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestUploadYAML:
|
|
"""Tests for POST /api/configs/upload-yaml"""
|
|
|
|
def test_upload_yaml_valid(self, client, auth_headers, sample_yaml):
|
|
"""Test uploading valid YAML"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(sample_yaml.encode('utf-8')), 'test.yaml')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 200
|
|
|
|
result = response.get_json()
|
|
assert result['success'] is True
|
|
assert 'filename' in result
|
|
|
|
def test_upload_yaml_no_file(self, client, auth_headers):
|
|
"""Test uploading without file"""
|
|
response = client.post('/api/configs/upload-yaml', data={},
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
def test_upload_yaml_invalid_syntax(self, client, auth_headers):
|
|
"""Test uploading YAML with invalid syntax"""
|
|
from io import BytesIO
|
|
|
|
invalid_yaml = "invalid: yaml: syntax: ["
|
|
|
|
data = {
|
|
'file': (BytesIO(invalid_yaml.encode('utf-8')), 'test.yaml')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
def test_upload_yaml_missing_required_fields(self, client, auth_headers):
|
|
"""Test uploading YAML missing required fields"""
|
|
from io import BytesIO
|
|
|
|
invalid_yaml = """sites:
|
|
- name: Test
|
|
ips:
|
|
- address: 10.0.0.1
|
|
"""
|
|
|
|
data = {
|
|
'file': (BytesIO(invalid_yaml.encode('utf-8')), 'test.yaml')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
def test_upload_yaml_wrong_extension(self, client, auth_headers):
|
|
"""Test uploading file with wrong extension"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(b'test'), 'test.txt')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 400
|
|
|
|
def test_upload_yaml_requires_auth(self, client, sample_yaml):
|
|
"""Test that uploading YAML requires authentication"""
|
|
from io import BytesIO
|
|
|
|
data = {
|
|
'file': (BytesIO(sample_yaml.encode('utf-8')), 'test.yaml')
|
|
}
|
|
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
content_type='multipart/form-data')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestDownloadTemplate:
|
|
"""Tests for GET /api/configs/template"""
|
|
|
|
def test_download_template(self, client, auth_headers):
|
|
"""Test downloading CSV template"""
|
|
response = client.get('/api/configs/template', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
assert response.content_type == 'text/csv; charset=utf-8'
|
|
assert b'scan_title,site_name,ip_address' in response.data
|
|
|
|
def test_download_template_requires_auth(self, client):
|
|
"""Test that downloading template requires authentication"""
|
|
response = client.get('/api/configs/template')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestDownloadConfig:
|
|
"""Tests for GET /api/configs/<filename>/download"""
|
|
|
|
def test_download_config_valid(self, client, auth_headers, app, sample_yaml):
|
|
"""Test downloading existing config"""
|
|
# Create a config file
|
|
temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs')
|
|
config_path = os.path.join(temp_configs_dir, 'test-scan.yaml')
|
|
with open(config_path, 'w') as f:
|
|
f.write(sample_yaml)
|
|
|
|
response = client.get('/api/configs/test-scan.yaml/download', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
assert response.content_type == 'application/x-yaml; charset=utf-8'
|
|
assert b'title: Test Scan' in response.data
|
|
|
|
def test_download_config_not_found(self, client, auth_headers):
|
|
"""Test downloading non-existent config"""
|
|
response = client.get('/api/configs/nonexistent.yaml/download', headers=auth_headers)
|
|
assert response.status_code == 404
|
|
|
|
def test_download_config_requires_auth(self, client):
|
|
"""Test that downloading config requires authentication"""
|
|
response = client.get('/api/configs/test.yaml/download')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestDeleteConfig:
|
|
"""Tests for DELETE /api/configs/<filename>"""
|
|
|
|
def test_delete_config_valid(self, client, auth_headers, app, sample_yaml):
|
|
"""Test deleting a config file"""
|
|
# Create a config file
|
|
temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs')
|
|
config_path = os.path.join(temp_configs_dir, 'test-scan.yaml')
|
|
with open(config_path, 'w') as f:
|
|
f.write(sample_yaml)
|
|
|
|
response = client.delete('/api/configs/test-scan.yaml', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
data = response.get_json()
|
|
assert data['success'] is True
|
|
|
|
# Verify file is deleted
|
|
assert not os.path.exists(config_path)
|
|
|
|
def test_delete_config_not_found(self, client, auth_headers):
|
|
"""Test deleting non-existent config"""
|
|
response = client.delete('/api/configs/nonexistent.yaml', headers=auth_headers)
|
|
assert response.status_code == 404
|
|
|
|
def test_delete_config_requires_auth(self, client):
|
|
"""Test that deleting config requires authentication"""
|
|
response = client.delete('/api/configs/test.yaml')
|
|
assert response.status_code in [401, 302]
|
|
|
|
|
|
class TestEndToEndWorkflow:
|
|
"""End-to-end workflow tests"""
|
|
|
|
def test_complete_csv_workflow(self, client, auth_headers, sample_csv):
|
|
"""Test complete CSV upload workflow"""
|
|
from io import BytesIO
|
|
|
|
# 1. Download template
|
|
response = client.get('/api/configs/template', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
# 2. Upload CSV
|
|
data = {
|
|
'file': (BytesIO(sample_csv.encode('utf-8')), 'workflow-test.csv')
|
|
}
|
|
response = client.post('/api/configs/upload-csv', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 200
|
|
result = response.get_json()
|
|
filename = result['filename']
|
|
|
|
# 3. List configs (should include new one)
|
|
response = client.get('/api/configs', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
configs = response.get_json()['configs']
|
|
assert any(c['filename'] == filename for c in configs)
|
|
|
|
# 4. Get config details
|
|
response = client.get(f'/api/configs/{filename}', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
# 5. Download config
|
|
response = client.get(f'/api/configs/{filename}/download', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
# 6. Delete config
|
|
response = client.delete(f'/api/configs/{filename}', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
# 7. Verify deletion
|
|
response = client.get(f'/api/configs/{filename}', headers=auth_headers)
|
|
assert response.status_code == 404
|
|
|
|
def test_yaml_upload_workflow(self, client, auth_headers, sample_yaml):
|
|
"""Test YAML upload workflow"""
|
|
from io import BytesIO
|
|
|
|
# Upload YAML
|
|
data = {
|
|
'file': (BytesIO(sample_yaml.encode('utf-8')), 'yaml-workflow.yaml')
|
|
}
|
|
response = client.post('/api/configs/upload-yaml', data=data,
|
|
headers=auth_headers, content_type='multipart/form-data')
|
|
assert response.status_code == 200
|
|
|
|
filename = response.get_json()['filename']
|
|
|
|
# Verify it exists
|
|
response = client.get(f'/api/configs/{filename}', headers=auth_headers)
|
|
assert response.status_code == 200
|
|
|
|
# Clean up
|
|
client.delete(f'/api/configs/{filename}', headers=auth_headers)
|