From fdf689316f2dbbf9cb595a43a8fb1a839092515b Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Wed, 19 Nov 2025 21:27:05 -0600 Subject: [PATCH 1/2] code cleanup, UI change to menu to make it cleaner --- app/templates/report_mockup.html | 1424 -------------------------- app/tests/test_scan_api.py | 18 +- app/tests/test_schedule_api.py | 22 +- app/web/routes/main.py | 47 - app/web/services/config_service.py | 630 +----------- app/web/services/site_service.py | 1 - app/web/templates/base.html | 18 +- app/web/templates/config_edit.html | 263 ----- app/web/templates/config_upload.html | 415 -------- app/web/utils/validators.py | 263 +---- 10 files changed, 32 insertions(+), 3069 deletions(-) delete mode 100644 app/templates/report_mockup.html delete mode 100644 app/web/templates/config_edit.html delete mode 100644 app/web/templates/config_upload.html diff --git a/app/templates/report_mockup.html b/app/templates/report_mockup.html deleted file mode 100644 index e226a3e..0000000 --- a/app/templates/report_mockup.html +++ /dev/null @@ -1,1424 +0,0 @@ - - - - - - SneakyScanner Report - Production Infrastructure Scan - - - -
- -
-

Production Infrastructure Scan

-
- 📅 Scan Time: 2025-11-14 10:30:00 UTC - ⏱️ Duration: 127.5 seconds - 📄 Config: /app/configs/production.yaml -
-
- - -
-

Scan Summary

-
- -
-

Scan Statistics

-
-
- Total IPs Scanned - 5 -
-
- TCP Ports Found - 47 -
-
- UDP Ports Found - 3 -
-
- Services Identified - 42 -
-
- Web Services - 12 -
-
- Screenshots Captured - 11 -
-
-
- - -
-

Drift Alerts

-
-
- Unexpected TCP Ports - 4 -
-
- Missing Expected Services - 2 -
-
- New Services Detected - 3 -
-
- Unexpected UDP Ports - 1 -
-
-
- - -
-

Security Warnings

-
-
- Certificates Expiring Soon (<30 days) - 2 -
-
- Weak TLS Versions (1.0/1.1) - 1 -
-
- Self-Signed Certificates - 3 -
-
- High Port Services (>10000) - 5 -
-
-
-
-
- - -
- - - -
-
-

192.168.1.10

-
- Ping: Expected - 2 Unexpected Ports -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PortProtocolServiceProductStatus
22TCPsshOpenSSHExpected
-
-
-
- Product - OpenSSH -
-
- Version - 8.2p1 Ubuntu 4ubuntu0.5 -
-
- OS Type - Linux -
-
- Extra Info - protocol 2.0 -
-
-
-
80TCPhttpnginxExpected
-
-
-
- Product - nginx -
-
- Version - 1.18.0 (Ubuntu) -
-
- Protocol - HTTP -
-
- - 🖼️ View Screenshot - -
-
443TCPhttpsnginxExpected
-
-
-
- Product - nginx -
-
- Version - 1.18.0 -
-
- Protocol - HTTPS -
-
- - 🖼️ View Screenshot - - - -
-
-

🔒 SSL/TLS Details

- Click to expand ▼ -
-
- -
Certificate Information
-
-
- Subject - CN=prod.example.com -
-
- Issuer - CN=Let's Encrypt Authority X3 -
-
- Valid From - 2025-01-01 00:00:00 UTC -
-
- Valid Until - 2025-04-01 23:59:59 UTC -
-
- Days Until Expiry - 138 days -
-
- Serial Number - 03:AB:CD:EF:12:34:56:78 -
-
- -
- Subject Alternative Names (SANs) - prod.example.com, www.prod.example.com, api.example.com -
- - -
-
TLS Version Support
- -
-
- TLS 1.0 - Not Supported -
-
- -
-
- TLS 1.1 - Not Supported -
-
- -
-
- TLS 1.2 - Supported -
-
    -
  • TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
  • -
  • TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
  • -
  • TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
  • -
-
- -
-
- TLS 1.3 - Supported -
-
    -
  • TLS_AES_256_GCM_SHA384
  • -
  • TLS_AES_128_GCM_SHA256
  • -
  • TLS_CHACHA20_POLY1305_SHA256
  • -
-
-
-
-
-
-
3000TCPhttpNode.js ExpressUnexpected
-
-
-
- Product - Node.js Express -
-
- Version - 4.18.2 -
-
- Protocol - HTTP -
-
- ⚠️ Status - Not in expected ports list -
-
- - 🖼️ View Screenshot - -
-
8080TCPhttp-proxyTomcatUnexpected
-
-
-
- Product - Apache Tomcat -
-
- Version - 9.0.65 -
-
- Protocol - HTTP -
-
- ⚠️ Status - Not in expected ports list -
-
- - 🖼️ View Screenshot - -
-
-
- - -
-
-

192.168.1.11

-
- Ping: Expected - All Ports Expected -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PortProtocolServiceProductStatus
22TCPsshOpenSSHExpected
-
-
-
- Product - OpenSSH -
-
- Version - 8.9p1 Ubuntu 3ubuntu0.1 -
-
- OS Type - Linux -
-
-
-
443TCPhttpsApacheExpected
-
-
-
- Product - Apache httpd -
-
- Version - 2.4.52 -
-
- Protocol - HTTPS -
-
- - 🖼️ View Screenshot - - - -
-
-

🔒 SSL/TLS Details Certificate Expiring Soon

- Click to expand ▼ -
-
-
Certificate Information
-
-
- Subject - CN=backup.example.com -
-
- Issuer - CN=DigiCert TLS RSA SHA256 2020 CA1 -
-
- Valid From - 2024-10-15 00:00:00 UTC -
-
- Valid Until - 2025-11-29 23:59:59 UTC -
-
- Days Until Expiry - 15 days ⚠️ -
-
- Serial Number - 09:FE:DC:BA:98:76:54:32 -
-
- -
- Subject Alternative Names (SANs) - backup.example.com -
- -
-
TLS Version Support
- -
-
- TLS 1.0 - Supported (Weak) ⚠️ -
-
    -
  • TLS_RSA_WITH_AES_128_CBC_SHA
  • -
  • TLS_RSA_WITH_AES_256_CBC_SHA
  • -
-
- -
-
- TLS 1.2 - Supported -
-
    -
  • TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
  • -
  • TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
  • -
-
- -
-
- TLS 1.3 - Not Supported -
-
-
-
-
-
-
53UDPdomainISC BINDExpected
-
-
-
- Product - ISC BIND -
-
- Version - 9.16.1-Ubuntu -
-
- Service Type - DNS Server -
-
-
-
-
- - -
-
-

192.168.1.12

-
- Ping: Expected - 1 Missing Service -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PortProtocolServiceProductStatus
22TCPsshOpenSSHExpected
-
-
-
- Product - OpenSSH -
-
- Version - 8.2p1 -
-
-
-
443TCP❌ Expected but not foundMissing
-
-
- - -
- - - -
-
-

192.168.1.50

-
- Ping: Expected - All Ports Expected -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PortProtocolServiceProductStatus
22TCPsshOpenSSHExpected
-
-
-
- Product - OpenSSH -
-
- Version - 8.4p1 Debian 5+deb11u1 -
-
-
-
8006TCPhttpsProxmox VEExpected
-
-
-
- Product - Proxmox Virtual Environment -
-
- Version - 7.4-1 -
-
- Protocol - HTTPS -
-
- Service Type - Virtualization Management -
-
- - 🖼️ View Screenshot - - - -
-
-

🔒 SSL/TLS Details Self-Signed Certificate

- Click to expand ▼ -
-
-
Certificate Information
-
-
- Subject - CN=proxmox.local -
-
- Issuer - CN=proxmox.local (Self-Signed) -
-
- Valid From - 2024-01-10 12:00:00 UTC -
-
- Valid Until - 2034-01-08 12:00:00 UTC -
-
- Days Until Expiry - 3342 days -
-
- Serial Number - 01:23:45:67:89:AB:CD:EF -
-
- -
-
TLS Version Support
- -
-
- TLS 1.0 - Not Supported -
-
- -
-
- TLS 1.1 - Not Supported -
-
- -
-
- TLS 1.2 - Supported -
-
    -
  • TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
  • -
  • TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
  • -
-
- -
-
- TLS 1.3 - Supported -
-
    -
  • TLS_AES_256_GCM_SHA384
  • -
  • TLS_CHACHA20_POLY1305_SHA256
  • -
-
-
-
-
-
-
-
- - -
-
-

192.168.1.51

-
- Ping: Expected - All Ports Expected -
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
PortProtocolServiceProductStatus
22TCPsshOpenSSHExpected
-
-
-
- Product - OpenSSH -
-
- Version - 8.9p1 Ubuntu 3ubuntu0.6 -
-
-
-
5432TCPpostgresqlPostgreSQLExpected
-
-
-
- Product - PostgreSQL DB -
-
- Version - 14.7 -
-
- Extra Info - Debian 14.7-1.pgdg110+1 -
-
- OS Type - Linux -
-
-
-
6379TCPredisRedisExpected
-
-
-
- Product - Redis key-value store -
-
- Version - 7.0.8 -
-
- Extra Info - 64-bit -
-
-
-
-
-
- -
- - - - diff --git a/app/tests/test_scan_api.py b/app/tests/test_scan_api.py index 648ed29..5927fd5 100644 --- a/app/tests/test_scan_api.py +++ b/app/tests/test_scan_api.py @@ -37,7 +37,7 @@ class TestScanAPIEndpoints: assert len(data['scans']) == 1 assert data['scans'][0]['id'] == sample_scan.id - def test_list_scans_pagination(self, client, db): + def test_list_scans_pagination(self, client, db, sample_db_config): """Test scan list pagination.""" # Create 25 scans for i in range(25): @@ -126,7 +126,7 @@ class TestScanAPIEndpoints: def test_trigger_scan_success(self, client, db, sample_db_config): """Test triggering a new scan.""" response = client.post('/api/scans', - json={'config_file': str(sample_db_config)}, + json={'config_id': sample_db_config.id}, content_type='application/json' ) assert response.status_code == 201 @@ -142,8 +142,8 @@ class TestScanAPIEndpoints: assert scan.status == 'running' assert scan.triggered_by == 'api' - def test_trigger_scan_missing_config_file(self, client, db): - """Test triggering scan without config_file.""" + def test_trigger_scan_missing_config_id(self, client, db): + """Test triggering scan without config_id.""" response = client.post('/api/scans', json={}, content_type='application/json' @@ -152,12 +152,12 @@ class TestScanAPIEndpoints: data = json.loads(response.data) assert 'error' in data - assert 'config_file is required' in data['message'] + assert 'config_id is required' in data['message'] - def test_trigger_scan_invalid_config_file(self, client, db): - """Test triggering scan with non-existent config file.""" + def test_trigger_scan_invalid_config_id(self, client, db): + """Test triggering scan with non-existent config.""" response = client.post('/api/scans', - json={'config_file': '/nonexistent/config.yaml'}, + json={'config_id': 99999}, content_type='application/json' ) assert response.status_code == 400 @@ -231,7 +231,7 @@ class TestScanAPIEndpoints: """ # Step 1: Trigger scan response = client.post('/api/scans', - json={'config_file': str(sample_db_config)}, + json={'config_id': sample_db_config.id}, content_type='application/json' ) assert response.status_code == 201 diff --git a/app/tests/test_schedule_api.py b/app/tests/test_schedule_api.py index 2212f7a..34910cf 100644 --- a/app/tests/test_schedule_api.py +++ b/app/tests/test_schedule_api.py @@ -151,7 +151,7 @@ class TestScheduleAPIEndpoints: data = json.loads(response.data) assert data['id'] == sample_schedule.id assert data['name'] == sample_schedule.name - assert data['config_file'] == sample_schedule.config_file + assert data['config_id'] == sample_schedule.config_id assert data['cron_expression'] == sample_schedule.cron_expression assert data['enabled'] == sample_schedule.enabled assert 'history' in data @@ -169,7 +169,7 @@ class TestScheduleAPIEndpoints: """Test creating a new schedule.""" schedule_data = { 'name': 'New Test Schedule', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': '0 3 * * *', 'enabled': True } @@ -197,7 +197,7 @@ class TestScheduleAPIEndpoints: # Missing cron_expression schedule_data = { 'name': 'Incomplete Schedule', - 'config_file': '/app/configs/test.yaml' + 'config_id': 1 } response = client.post( @@ -215,7 +215,7 @@ class TestScheduleAPIEndpoints: """Test creating schedule with invalid cron expression.""" schedule_data = { 'name': 'Invalid Cron Schedule', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': 'invalid cron' } @@ -231,10 +231,10 @@ class TestScheduleAPIEndpoints: assert 'invalid' in data['error'].lower() or 'cron' in data['error'].lower() def test_create_schedule_invalid_config(self, client, db): - """Test creating schedule with non-existent config file.""" + """Test creating schedule with non-existent config.""" schedule_data = { 'name': 'Invalid Config Schedule', - 'config_file': '/nonexistent/config.yaml', + 'config_id': 99999, 'cron_expression': '0 2 * * *' } @@ -399,7 +399,7 @@ class TestScheduleAPIEndpoints: assert scan is not None assert scan.triggered_by == 'manual' assert scan.schedule_id == sample_schedule.id - assert scan.config_file == sample_schedule.config_file + assert scan.config_id == sample_schedule.config_id def test_trigger_schedule_not_found(self, client, db): """Test triggering non-existent schedule.""" @@ -436,7 +436,7 @@ class TestScheduleAPIEndpoints: # 1. Create schedule schedule_data = { 'name': 'Integration Test Schedule', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': '0 2 * * *', 'enabled': True } @@ -527,7 +527,7 @@ class TestScheduleAPIEndpoints: """Test creating a disabled schedule.""" schedule_data = { 'name': 'Disabled Schedule', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': '0 2 * * *', 'enabled': False } @@ -600,7 +600,7 @@ class TestScheduleAPICronValidation: for cron_expr in valid_expressions: schedule_data = { 'name': f'Schedule for {cron_expr}', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': cron_expr } @@ -626,7 +626,7 @@ class TestScheduleAPICronValidation: for cron_expr in invalid_expressions: schedule_data = { 'name': f'Schedule for {cron_expr}', - 'config_file': sample_db_config, + 'config_id': sample_db_config.id, 'cron_expression': cron_expr } diff --git a/app/web/routes/main.py b/app/web/routes/main.py index 6a55880..ac15a97 100644 --- a/app/web/routes/main.py +++ b/app/web/routes/main.py @@ -128,8 +128,6 @@ def edit_schedule(schedule_id): Returns: Rendered schedule edit template """ - from flask import flash - # Note: Schedule data is loaded via AJAX in the template # This just renders the page with the schedule_id in the URL return render_template('schedule_edit.html', schedule_id=schedule_id) @@ -159,51 +157,6 @@ def configs(): return render_template('configs.html') -@bp.route('/configs/upload') -@login_required -def upload_config(): - """ - Config upload page - allows CIDR/YAML upload. - - Returns: - Rendered config upload template - """ - return render_template('config_upload.html') - - -@bp.route('/configs/edit/') -@login_required -def edit_config(filename): - """ - Config edit page - allows editing YAML configuration. - - Args: - filename: Config filename to edit - - Returns: - Rendered config edit template - """ - from web.services.config_service import ConfigService - from flask import flash, redirect - - try: - config_service = ConfigService() - config_data = config_service.get_config(filename) - - return render_template( - 'config_edit.html', - filename=config_data['filename'], - content=config_data['content'] - ) - except FileNotFoundError: - flash(f"Config file '{filename}' not found", 'error') - return redirect(url_for('main.configs')) - except Exception as e: - logger.error(f"Error loading config for edit: {e}") - flash(f"Error loading config: {str(e)}", 'error') - return redirect(url_for('main.configs')) - - @bp.route('/alerts') @login_required def alerts(): diff --git a/app/web/services/config_service.py b/app/web/services/config_service.py index 6188454..0146c6f 100644 --- a/app/web/services/config_service.py +++ b/app/web/services/config_service.py @@ -6,13 +6,8 @@ both database-stored (primary) and file-based (deprecated). """ import os -import re -import yaml -import ipaddress -from typing import Dict, List, Tuple, Any, Optional +from typing import Dict, List, Any, Optional from datetime import datetime -from pathlib import Path -from werkzeug.utils import secure_filename from sqlalchemy.orm import Session @@ -342,626 +337,3 @@ class ConfigService: self.db.commit() return self.get_config_by_id(config_id) - - # ============================================================================ - # Legacy YAML File Operations (Deprecated) - # ============================================================================ - - def list_configs_file(self) -> List[Dict[str, Any]]: - """ - [DEPRECATED] List all config files with metadata. - - Returns: - List of config metadata dictionaries: - [ - { - "filename": "prod-scan.yaml", - "title": "Prod Scan", - "path": "/app/configs/prod-scan.yaml", - "created_at": "2025-11-15T10:30:00Z", - "size_bytes": 1234, - "used_by_schedules": ["Daily Scan", "Weekly Audit"] - } - ] - """ - configs = [] - - # Get all YAML files in configs directory - if not os.path.exists(self.configs_dir): - return configs - - for filename in os.listdir(self.configs_dir): - if not filename.endswith(('.yaml', '.yml')): - continue - - filepath = os.path.join(self.configs_dir, filename) - - if not os.path.isfile(filepath): - continue - - try: - # Get file metadata - stat_info = os.stat(filepath) - created_at = datetime.fromtimestamp(stat_info.st_mtime).isoformat() + 'Z' - size_bytes = stat_info.st_size - - # Parse YAML to get title - title = None - try: - with open(filepath, 'r') as f: - data = yaml.safe_load(f) - if isinstance(data, dict): - title = data.get('title', filename) - except Exception: - title = filename # Fallback to filename if parsing fails - - # Get schedules using this config - used_by_schedules = self.get_schedules_using_config(filename) - - configs.append({ - 'filename': filename, - 'title': title, - 'path': filepath, - 'created_at': created_at, - 'size_bytes': size_bytes, - 'used_by_schedules': used_by_schedules - }) - except Exception as e: - # Skip files that can't be read - continue - - # Sort by created_at (most recent first) - configs.sort(key=lambda x: x['created_at'], reverse=True) - - return configs - - def get_config(self, filename: str) -> Dict[str, Any]: - """ - Get config file content and parsed data. - - Args: - filename: Config filename - - Returns: - { - "filename": "prod-scan.yaml", - "content": "title: Prod Scan\n...", - "parsed": {"title": "Prod Scan", "sites": [...]} - } - - Raises: - FileNotFoundError: If config doesn't exist - ValueError: If config content is invalid - """ - filepath = os.path.join(self.configs_dir, filename) - - if not os.path.exists(filepath): - raise FileNotFoundError(f"Config file '{filename}' not found") - - # Read file content - with open(filepath, 'r') as f: - content = f.read() - - # Parse YAML - try: - parsed = yaml.safe_load(content) - except yaml.YAMLError as e: - raise ValueError(f"Invalid YAML syntax: {str(e)}") - - return { - 'filename': filename, - 'content': content, - 'parsed': parsed - } - - def create_from_yaml(self, filename: str, content: str) -> str: - """ - Create config from YAML content. - - Args: - filename: Desired filename (will be sanitized) - content: YAML content string - - Returns: - Final filename (sanitized) - - Raises: - ValueError: If content invalid or filename conflict - """ - # Sanitize filename - filename = secure_filename(filename) - - # Ensure .yaml extension - if not filename.endswith(('.yaml', '.yml')): - filename += '.yaml' - - filepath = os.path.join(self.configs_dir, filename) - - # Check for conflicts - if os.path.exists(filepath): - raise ValueError(f"Config file '{filename}' already exists") - - # Parse and validate YAML - try: - parsed = yaml.safe_load(content) - except yaml.YAMLError as e: - raise ValueError(f"Invalid YAML syntax: {str(e)}") - - # Validate config structure - is_valid, error_msg = self.validate_config_content(parsed) - if not is_valid: - raise ValueError(f"Invalid config structure: {error_msg}") - - # Create inline sites in database (if any) - self.create_inline_sites(parsed) - - # Write file - with open(filepath, 'w') as f: - f.write(content) - - return filename - - def create_from_cidr( - self, - title: str, - cidr: str, - site_name: Optional[str] = None, - ping_default: bool = False - ) -> Tuple[str, str]: - """ - Create config from CIDR range. - - Args: - title: Scan configuration title - cidr: CIDR range (e.g., "10.0.0.0/24") - site_name: Optional site name (defaults to "Site 1") - ping_default: Default ping expectation for all IPs - - Returns: - Tuple of (final_filename, yaml_content) - - Raises: - ValueError: If CIDR invalid or other validation errors - """ - # Validate and parse CIDR - try: - network = ipaddress.ip_network(cidr, strict=False) - except ValueError as e: - raise ValueError(f"Invalid CIDR range: {str(e)}") - - # Check if network is too large (prevent expansion of huge ranges) - if network.num_addresses > 10000: - raise ValueError(f"CIDR range too large: {network.num_addresses} addresses. Maximum is 10,000.") - - # Expand CIDR to list of IP addresses - ip_list = [str(ip) for ip in network.hosts()] - - # If network has only 1 address (like /32 or /128), hosts() returns empty - # In that case, use the network address itself - if not ip_list: - ip_list = [str(network.network_address)] - - # Build site name - if not site_name or not site_name.strip(): - site_name = "Site 1" - - # Build IP configurations - ips = [] - for ip_address in ip_list: - ips.append({ - 'address': ip_address, - 'expected': { - 'ping': ping_default, - 'tcp_ports': [], - 'udp_ports': [] - } - }) - - # Build YAML structure - config_data = { - 'title': title.strip(), - 'sites': [ - { - 'name': site_name.strip(), - 'ips': ips - } - ] - } - - # Convert to YAML string - yaml_content = yaml.dump(config_data, sort_keys=False, default_flow_style=False) - - # Generate filename from title - filename = self.generate_filename_from_title(title) - - filepath = os.path.join(self.configs_dir, filename) - - # Check for conflicts - if os.path.exists(filepath): - raise ValueError(f"Config file '{filename}' already exists") - - # Write file - with open(filepath, 'w') as f: - f.write(yaml_content) - - return filename, yaml_content - - def update_config_file(self, filename: str, yaml_content: str) -> None: - """ - [DEPRECATED] Update existing config file with new YAML content. - - Args: - filename: Config filename to update - yaml_content: New YAML content string - - Raises: - FileNotFoundError: If config doesn't exist - ValueError: If YAML content is invalid - """ - filepath = os.path.join(self.configs_dir, filename) - - # Check if file exists - if not os.path.exists(filepath): - raise FileNotFoundError(f"Config file '{filename}' not found") - - # Parse and validate YAML - try: - parsed = yaml.safe_load(yaml_content) - except yaml.YAMLError as e: - raise ValueError(f"Invalid YAML syntax: {str(e)}") - - # Validate config structure - is_valid, error_msg = self.validate_config_content(parsed) - if not is_valid: - raise ValueError(f"Invalid config structure: {error_msg}") - - # Write updated content - with open(filepath, 'w') as f: - f.write(yaml_content) - - def delete_config_file(self, filename: str) -> None: - """ - [DEPRECATED] Delete config file and cascade delete any associated schedules. - - When a config is deleted, all schedules using that config (both enabled - and disabled) are automatically deleted as well, since they would be - invalid without the config file. - - Args: - filename: Config filename to delete - - Raises: - FileNotFoundError: If config doesn't exist - """ - filepath = os.path.join(self.configs_dir, filename) - - if not os.path.exists(filepath): - raise FileNotFoundError(f"Config file '{filename}' not found") - - # Delete any schedules using this config (both enabled and disabled) - try: - from web.services.schedule_service import ScheduleService - from flask import current_app - - # Get database session from Flask app - db = current_app.db_session - - # Get all schedules - schedule_service = ScheduleService(db) - result = schedule_service.list_schedules(page=1, per_page=10000) - schedules = result.get('schedules', []) - - # Build full path for comparison - config_path = os.path.join(self.configs_dir, filename) - - # Note: This function is deprecated. Schedules now use config_id. - # This code path should not be reached for new configs. - deleted_schedules = [] - import logging - logging.getLogger(__name__).warning( - f"delete_config_file called for '{filename}' - this is deprecated. Use database configs with config_id instead." - ) - - if deleted_schedules: - import logging - logging.getLogger(__name__).info( - f"Cascade deleted {len(deleted_schedules)} schedule(s) associated with config '{filename}': {', '.join(deleted_schedules)}" - ) - - except ImportError: - # If ScheduleService doesn't exist yet, skip schedule deletion - pass - except Exception as e: - # Log error but continue with config deletion - import logging - logging.getLogger(__name__).error( - f"Error deleting schedules for config {filename}: {e}", exc_info=True - ) - - # Delete file - os.remove(filepath) - - def validate_config_content(self, content: Dict, check_site_refs: bool = True) -> Tuple[bool, str]: - """ - Validate parsed YAML config structure. - - Supports both legacy format (inline IPs) and new format (site references or CIDRs). - - Args: - content: Parsed YAML config as dict - check_site_refs: If True, validates that referenced sites exist in database - - Returns: - Tuple of (is_valid, error_message) - """ - if not isinstance(content, dict): - return False, "Config must be a dictionary/object" - - # Check required fields - if 'title' not in content: - return False, "Missing required field: 'title'" - - if 'sites' not in content: - return False, "Missing required field: 'sites'" - - # Validate title - if not isinstance(content['title'], str) or not content['title'].strip(): - return False, "Field 'title' must be a non-empty string" - - # Validate sites - sites = content['sites'] - if not isinstance(sites, list): - return False, "Field 'sites' must be a list" - - if len(sites) == 0: - return False, "Must have at least one site defined" - - # Validate each site - for i, site in enumerate(sites): - if not isinstance(site, dict): - return False, f"Site {i+1} must be a dictionary/object" - - # Check if this is a site reference (new format) - if 'site_ref' in site: - # Site reference format - site_ref = site.get('site_ref') - if not isinstance(site_ref, str) or not site_ref.strip(): - return False, f"Site {i+1} field 'site_ref' must be a non-empty string" - - # Validate site reference exists (if check enabled) - if check_site_refs: - try: - from web.services.site_service import SiteService - from flask import current_app - - site_service = SiteService(current_app.db_session) - referenced_site = site_service.get_site_by_name(site_ref) - if not referenced_site: - return False, f"Site {i+1}: Referenced site '{site_ref}' does not exist" - except Exception as e: - # If we can't check (e.g., outside app context), skip validation - pass - - continue # Site reference is valid - - # Check if this is inline site creation with CIDRs (new format) - if 'cidrs' in site: - # Inline site creation with CIDR format - if 'name' not in site: - return False, f"Site {i+1} with inline CIDRs missing required field: 'name'" - - cidrs = site.get('cidrs') - if not isinstance(cidrs, list): - return False, f"Site {i+1} field 'cidrs' must be a list" - - if len(cidrs) == 0: - return False, f"Site {i+1} must have at least one CIDR" - - # Validate each CIDR - for j, cidr_config in enumerate(cidrs): - if not isinstance(cidr_config, dict): - return False, f"Site {i+1} CIDR {j+1} must be a dictionary/object" - - if 'cidr' not in cidr_config: - return False, f"Site {i+1} CIDR {j+1} missing required field: 'cidr'" - - # Validate CIDR format - cidr_str = cidr_config.get('cidr') - try: - ipaddress.ip_network(cidr_str, strict=False) - except ValueError: - return False, f"Site {i+1} CIDR {j+1}: Invalid CIDR notation '{cidr_str}'" - - continue # Inline CIDR site is valid - - # Legacy format: inline IPs - if 'name' not in site: - return False, f"Site {i+1} missing required field: 'name'" - - if 'ips' not in site: - return False, f"Site {i+1} missing required field: 'ips' (or use 'site_ref' or 'cidrs')" - - if not isinstance(site['ips'], list): - return False, f"Site {i+1} field 'ips' must be a list" - - if len(site['ips']) == 0: - return False, f"Site {i+1} must have at least one IP" - - # Validate each IP - for j, ip_config in enumerate(site['ips']): - if not isinstance(ip_config, dict): - return False, f"Site {i+1} IP {j+1} must be a dictionary/object" - - if 'address' not in ip_config: - return False, f"Site {i+1} IP {j+1} missing required field: 'address'" - - if 'expected' not in ip_config: - return False, f"Site {i+1} IP {j+1} missing required field: 'expected'" - - if not isinstance(ip_config['expected'], dict): - return False, f"Site {i+1} IP {j+1} field 'expected' must be a dictionary/object" - - return True, "" - - def get_schedules_using_config(self, filename: str) -> List[str]: - """ - Get list of schedule names using this config. - - Args: - filename: Config filename - - Returns: - List of schedule names (e.g., ["Daily Scan", "Weekly Audit"]) - """ - # Import here to avoid circular dependency - try: - from web.services.schedule_service import ScheduleService - from flask import current_app - - # Get database session from Flask app - db = current_app.db_session - - # Get all schedules (use large per_page to get all) - schedule_service = ScheduleService(db) - result = schedule_service.list_schedules(page=1, per_page=10000) - - # Extract schedules list from paginated result - schedules = result.get('schedules', []) - - # Build full path for comparison - config_path = os.path.join(self.configs_dir, filename) - - # Note: This function is deprecated. Schedules now use config_id. - # Return empty list as schedules no longer use config_file. - return [] - - except ImportError: - # If ScheduleService doesn't exist yet, return empty list - return [] - except Exception as e: - # If any error occurs, return empty list (safer than failing) - # Log the error for debugging - import logging - logging.getLogger(__name__).error(f"Error getting schedules using config {filename}: {e}", exc_info=True) - return [] - - def generate_filename_from_title(self, title: str) -> str: - """ - Generate safe filename from scan title. - - Args: - title: Scan title string - - Returns: - Safe filename (e.g., "Prod Scan 2025" -> "prod-scan-2025.yaml") - """ - # Convert to lowercase - filename = title.lower() - - # Replace spaces with hyphens - filename = filename.replace(' ', '-') - - # Remove special characters (keep only alphanumeric, hyphens, underscores) - filename = re.sub(r'[^a-z0-9\-_]', '', filename) - - # Remove consecutive hyphens - filename = re.sub(r'-+', '-', filename) - - # Remove leading/trailing hyphens - filename = filename.strip('-') - - # Limit length (max 200 chars, reserve 5 for .yaml) - max_length = 195 - if len(filename) > max_length: - filename = filename[:max_length] - - # Ensure not empty - if not filename: - filename = 'config' - - # Add .yaml extension - filename += '.yaml' - - return filename - - def get_config_path(self, filename: str) -> str: - """ - Get absolute path for a config file. - - Args: - filename: Config filename - - Returns: - Absolute path to config file - """ - return os.path.join(self.configs_dir, filename) - - def config_exists(self, filename: str) -> bool: - """ - Check if a config file exists. - - Args: - filename: Config filename - - Returns: - True if file exists, False otherwise - """ - filepath = os.path.join(self.configs_dir, filename) - return os.path.exists(filepath) and os.path.isfile(filepath) - - def create_inline_sites(self, config_content: Dict) -> None: - """ - Create sites in the database for inline site definitions in a config. - - This method scans the config for inline site definitions (with CIDRs) - and creates them as reusable sites in the database if they don't already exist. - - Args: - config_content: Parsed YAML config dictionary - - Raises: - ValueError: If site creation fails - """ - try: - from web.services.site_service import SiteService - from flask import current_app - - site_service = SiteService(current_app.db_session) - - sites = config_content.get('sites', []) - - for site_def in sites: - # Skip site references (they already exist) - if 'site_ref' in site_def: - continue - - # Skip legacy IP-based sites (not creating those as reusable sites) - if 'ips' in site_def and 'cidrs' not in site_def: - continue - - # Process inline CIDR-based sites - if 'cidrs' in site_def: - site_name = site_def.get('name') - - # Check if site already exists - existing_site = site_service.get_site_by_name(site_name) - if existing_site: - # Site already exists, skip creation - continue - - # Create new site - cidrs = site_def.get('cidrs', []) - description = f"Auto-created from config '{config_content.get('title', 'Unknown')}'" - - site_service.create_site( - name=site_name, - description=description, - cidrs=cidrs - ) - - except Exception as e: - # If site creation fails, log but don't block config creation - import logging - logging.getLogger(__name__).warning( - f"Failed to create inline sites from config: {str(e)}" - ) diff --git a/app/web/services/site_service.py b/app/web/services/site_service.py index 646c334..f4a4fb7 100644 --- a/app/web/services/site_service.py +++ b/app/web/services/site_service.py @@ -13,7 +13,6 @@ from typing import Any, Dict, List, Optional from sqlalchemy import func from sqlalchemy.orm import Session, joinedload -from sqlalchemy.exc import IntegrityError from web.models import ( Site, SiteIP, ScanSiteAssociation diff --git a/app/web/templates/base.html b/app/web/templates/base.html index e069617..872f3f1 100644 --- a/app/web/templates/base.html +++ b/app/web/templates/base.html @@ -45,6 +45,16 @@ Dashboard + - -