""" Integration tests for Config API endpoints. Tests all config API endpoints including CSV/YAML upload, listing, downloading, and deletion with schedule protection. """ import pytest import os import tempfile import shutil from web.app import create_app from web.models import Base from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker @pytest.fixture def app(): """Create test application""" # Create temporary database test_db = tempfile.mktemp(suffix='.db') # Create temporary configs directory temp_configs_dir = tempfile.mkdtemp() app = create_app({ 'TESTING': True, 'SQLALCHEMY_DATABASE_URI': f'sqlite:///{test_db}', 'SECRET_KEY': 'test-secret-key', 'WTF_CSRF_ENABLED': False, }) # Override configs directory in ConfigService os.environ['CONFIGS_DIR'] = temp_configs_dir # Create tables with app.app_context(): Base.metadata.create_all(bind=app.db_session.get_bind()) yield app # Cleanup os.unlink(test_db) shutil.rmtree(temp_configs_dir) @pytest.fixture def client(app): """Create test client""" return app.test_client() @pytest.fixture def auth_headers(client): """Get authentication headers""" # First register and login a user from web.auth.models import User with client.application.app_context(): # Create test user user = User(username='testuser') user.set_password('testpass') client.application.db_session.add(user) client.application.db_session.commit() # Login response = client.post('/auth/login', data={ 'username': 'testuser', 'password': 'testpass' }, follow_redirects=True) assert response.status_code == 200 # Return empty headers (session-based auth) return {} @pytest.fixture def sample_csv(): """Sample CSV content""" return """scan_title,site_name,ip_address,ping_expected,tcp_ports,udp_ports,services Test Scan,Web Servers,10.10.20.4,true,"22,80,443",53,"ssh,http,https" Test Scan,Web Servers,10.10.20.5,true,22,,"ssh" """ @pytest.fixture def sample_yaml(): """Sample YAML content""" return """title: Test Scan sites: - name: Web Servers ips: - address: 10.10.20.4 expected: ping: true tcp_ports: [22, 80, 443] udp_ports: [53] services: [ssh, http, https] """ class TestListConfigs: """Tests for GET /api/configs""" def test_list_configs_empty(self, client, auth_headers): """Test listing configs when none exist""" response = client.get('/api/configs', headers=auth_headers) assert response.status_code == 200 data = response.get_json() assert 'configs' in data assert data['configs'] == [] def test_list_configs_with_files(self, client, auth_headers, app, sample_yaml): """Test listing configs with existing files""" # Create a config file temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs') config_path = os.path.join(temp_configs_dir, 'test-scan.yaml') with open(config_path, 'w') as f: f.write(sample_yaml) response = client.get('/api/configs', headers=auth_headers) assert response.status_code == 200 data = response.get_json() assert len(data['configs']) == 1 assert data['configs'][0]['filename'] == 'test-scan.yaml' assert data['configs'][0]['title'] == 'Test Scan' assert 'created_at' in data['configs'][0] assert 'size_bytes' in data['configs'][0] assert 'used_by_schedules' in data['configs'][0] def test_list_configs_requires_auth(self, client): """Test that listing configs requires authentication""" response = client.get('/api/configs') assert response.status_code in [401, 302] # Unauthorized or redirect class TestGetConfig: """Tests for GET /api/configs/""" def test_get_config_valid(self, client, auth_headers, app, sample_yaml): """Test getting a valid config file""" # Create a config file temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs') config_path = os.path.join(temp_configs_dir, 'test-scan.yaml') with open(config_path, 'w') as f: f.write(sample_yaml) response = client.get('/api/configs/test-scan.yaml', headers=auth_headers) assert response.status_code == 200 data = response.get_json() assert data['filename'] == 'test-scan.yaml' assert 'content' in data assert 'parsed' in data assert data['parsed']['title'] == 'Test Scan' def test_get_config_not_found(self, client, auth_headers): """Test getting non-existent config""" response = client.get('/api/configs/nonexistent.yaml', headers=auth_headers) assert response.status_code == 404 data = response.get_json() assert 'error' in data def test_get_config_requires_auth(self, client): """Test that getting config requires authentication""" response = client.get('/api/configs/test.yaml') assert response.status_code in [401, 302] class TestUploadCSV: """Tests for POST /api/configs/upload-csv""" def test_upload_csv_valid(self, client, auth_headers, sample_csv): """Test uploading valid CSV""" from io import BytesIO data = { 'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv') } response = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 200 result = response.get_json() assert result['success'] is True assert 'filename' in result assert result['filename'].endswith('.yaml') assert 'preview' in result def test_upload_csv_no_file(self, client, auth_headers): """Test uploading without file""" response = client.post('/api/configs/upload-csv', data={}, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 data = response.get_json() assert 'error' in data def test_upload_csv_invalid_format(self, client, auth_headers): """Test uploading invalid CSV""" from io import BytesIO invalid_csv = "not,a,valid,csv\nmissing,columns" data = { 'file': (BytesIO(invalid_csv.encode('utf-8')), 'test.csv') } response = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 result = response.get_json() assert 'error' in result def test_upload_csv_wrong_extension(self, client, auth_headers): """Test uploading file with wrong extension""" from io import BytesIO data = { 'file': (BytesIO(b'test'), 'test.txt') } response = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 def test_upload_csv_duplicate_filename(self, client, auth_headers, sample_csv): """Test uploading CSV that generates duplicate filename""" from io import BytesIO data = { 'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv') } # Upload first time response1 = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response1.status_code == 200 # Upload second time (should fail) response2 = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response2.status_code == 400 def test_upload_csv_requires_auth(self, client, sample_csv): """Test that uploading CSV requires authentication""" from io import BytesIO data = { 'file': (BytesIO(sample_csv.encode('utf-8')), 'test.csv') } response = client.post('/api/configs/upload-csv', data=data, content_type='multipart/form-data') assert response.status_code in [401, 302] class TestUploadYAML: """Tests for POST /api/configs/upload-yaml""" def test_upload_yaml_valid(self, client, auth_headers, sample_yaml): """Test uploading valid YAML""" from io import BytesIO data = { 'file': (BytesIO(sample_yaml.encode('utf-8')), 'test.yaml') } response = client.post('/api/configs/upload-yaml', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 200 result = response.get_json() assert result['success'] is True assert 'filename' in result def test_upload_yaml_no_file(self, client, auth_headers): """Test uploading without file""" response = client.post('/api/configs/upload-yaml', data={}, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 def test_upload_yaml_invalid_syntax(self, client, auth_headers): """Test uploading YAML with invalid syntax""" from io import BytesIO invalid_yaml = "invalid: yaml: syntax: [" data = { 'file': (BytesIO(invalid_yaml.encode('utf-8')), 'test.yaml') } response = client.post('/api/configs/upload-yaml', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 def test_upload_yaml_missing_required_fields(self, client, auth_headers): """Test uploading YAML missing required fields""" from io import BytesIO invalid_yaml = """sites: - name: Test ips: - address: 10.0.0.1 """ data = { 'file': (BytesIO(invalid_yaml.encode('utf-8')), 'test.yaml') } response = client.post('/api/configs/upload-yaml', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 def test_upload_yaml_wrong_extension(self, client, auth_headers): """Test uploading file with wrong extension""" from io import BytesIO data = { 'file': (BytesIO(b'test'), 'test.txt') } response = client.post('/api/configs/upload-yaml', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 400 def test_upload_yaml_requires_auth(self, client, sample_yaml): """Test that uploading YAML requires authentication""" from io import BytesIO data = { 'file': (BytesIO(sample_yaml.encode('utf-8')), 'test.yaml') } response = client.post('/api/configs/upload-yaml', data=data, content_type='multipart/form-data') assert response.status_code in [401, 302] class TestDownloadTemplate: """Tests for GET /api/configs/template""" def test_download_template(self, client, auth_headers): """Test downloading CSV template""" response = client.get('/api/configs/template', headers=auth_headers) assert response.status_code == 200 assert response.content_type == 'text/csv; charset=utf-8' assert b'scan_title,site_name,ip_address' in response.data def test_download_template_requires_auth(self, client): """Test that downloading template requires authentication""" response = client.get('/api/configs/template') assert response.status_code in [401, 302] class TestDownloadConfig: """Tests for GET /api/configs//download""" def test_download_config_valid(self, client, auth_headers, app, sample_yaml): """Test downloading existing config""" # Create a config file temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs') config_path = os.path.join(temp_configs_dir, 'test-scan.yaml') with open(config_path, 'w') as f: f.write(sample_yaml) response = client.get('/api/configs/test-scan.yaml/download', headers=auth_headers) assert response.status_code == 200 assert response.content_type == 'application/x-yaml; charset=utf-8' assert b'title: Test Scan' in response.data def test_download_config_not_found(self, client, auth_headers): """Test downloading non-existent config""" response = client.get('/api/configs/nonexistent.yaml/download', headers=auth_headers) assert response.status_code == 404 def test_download_config_requires_auth(self, client): """Test that downloading config requires authentication""" response = client.get('/api/configs/test.yaml/download') assert response.status_code in [401, 302] class TestDeleteConfig: """Tests for DELETE /api/configs/""" def test_delete_config_valid(self, client, auth_headers, app, sample_yaml): """Test deleting a config file""" # Create a config file temp_configs_dir = os.environ.get('CONFIGS_DIR', '/app/configs') config_path = os.path.join(temp_configs_dir, 'test-scan.yaml') with open(config_path, 'w') as f: f.write(sample_yaml) response = client.delete('/api/configs/test-scan.yaml', headers=auth_headers) assert response.status_code == 200 data = response.get_json() assert data['success'] is True # Verify file is deleted assert not os.path.exists(config_path) def test_delete_config_not_found(self, client, auth_headers): """Test deleting non-existent config""" response = client.delete('/api/configs/nonexistent.yaml', headers=auth_headers) assert response.status_code == 404 def test_delete_config_requires_auth(self, client): """Test that deleting config requires authentication""" response = client.delete('/api/configs/test.yaml') assert response.status_code in [401, 302] class TestEndToEndWorkflow: """End-to-end workflow tests""" def test_complete_csv_workflow(self, client, auth_headers, sample_csv): """Test complete CSV upload workflow""" from io import BytesIO # 1. Download template response = client.get('/api/configs/template', headers=auth_headers) assert response.status_code == 200 # 2. Upload CSV data = { 'file': (BytesIO(sample_csv.encode('utf-8')), 'workflow-test.csv') } response = client.post('/api/configs/upload-csv', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 200 result = response.get_json() filename = result['filename'] # 3. List configs (should include new one) response = client.get('/api/configs', headers=auth_headers) assert response.status_code == 200 configs = response.get_json()['configs'] assert any(c['filename'] == filename for c in configs) # 4. Get config details response = client.get(f'/api/configs/{filename}', headers=auth_headers) assert response.status_code == 200 # 5. Download config response = client.get(f'/api/configs/{filename}/download', headers=auth_headers) assert response.status_code == 200 # 6. Delete config response = client.delete(f'/api/configs/{filename}', headers=auth_headers) assert response.status_code == 200 # 7. Verify deletion response = client.get(f'/api/configs/{filename}', headers=auth_headers) assert response.status_code == 404 def test_yaml_upload_workflow(self, client, auth_headers, sample_yaml): """Test YAML upload workflow""" from io import BytesIO # Upload YAML data = { 'file': (BytesIO(sample_yaml.encode('utf-8')), 'yaml-workflow.yaml') } response = client.post('/api/configs/upload-yaml', data=data, headers=auth_headers, content_type='multipart/form-data') assert response.status_code == 200 filename = response.get_json()['filename'] # Verify it exists response = client.get(f'/api/configs/{filename}', headers=auth_headers) assert response.status_code == 200 # Clean up client.delete(f'/api/configs/{filename}', headers=auth_headers)