""" Config Service - Business logic for config file management This service handles all operations related to scan configuration files, including creation, validation, listing, and deletion. """ import os import re import yaml import ipaddress from typing import Dict, List, Tuple, Any, Optional from datetime import datetime from pathlib import Path from werkzeug.utils import secure_filename class ConfigService: """Business logic for config management""" def __init__(self, configs_dir: str = '/app/configs'): """ Initialize the config service. Args: configs_dir: Directory where config files are stored """ self.configs_dir = configs_dir # Ensure configs directory exists os.makedirs(self.configs_dir, exist_ok=True) def list_configs(self) -> List[Dict[str, Any]]: """ List all config files with metadata. Returns: List of config metadata dictionaries: [ { "filename": "prod-scan.yaml", "title": "Prod Scan", "path": "/app/configs/prod-scan.yaml", "created_at": "2025-11-15T10:30:00Z", "size_bytes": 1234, "used_by_schedules": ["Daily Scan", "Weekly Audit"] } ] """ configs = [] # Get all YAML files in configs directory if not os.path.exists(self.configs_dir): return configs for filename in os.listdir(self.configs_dir): if not filename.endswith(('.yaml', '.yml')): continue filepath = os.path.join(self.configs_dir, filename) if not os.path.isfile(filepath): continue try: # Get file metadata stat_info = os.stat(filepath) created_at = datetime.fromtimestamp(stat_info.st_mtime).isoformat() + 'Z' size_bytes = stat_info.st_size # Parse YAML to get title title = None try: with open(filepath, 'r') as f: data = yaml.safe_load(f) if isinstance(data, dict): title = data.get('title', filename) except Exception: title = filename # Fallback to filename if parsing fails # Get schedules using this config used_by_schedules = self.get_schedules_using_config(filename) configs.append({ 'filename': filename, 'title': title, 'path': filepath, 'created_at': created_at, 'size_bytes': size_bytes, 'used_by_schedules': used_by_schedules }) except Exception as e: # Skip files that can't be read continue # Sort by created_at (most recent first) configs.sort(key=lambda x: x['created_at'], reverse=True) return configs def get_config(self, filename: str) -> Dict[str, Any]: """ Get config file content and parsed data. Args: filename: Config filename Returns: { "filename": "prod-scan.yaml", "content": "title: Prod Scan\n...", "parsed": {"title": "Prod Scan", "sites": [...]} } Raises: FileNotFoundError: If config doesn't exist ValueError: If config content is invalid """ filepath = os.path.join(self.configs_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Read file content with open(filepath, 'r') as f: content = f.read() # Parse YAML try: parsed = yaml.safe_load(content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") return { 'filename': filename, 'content': content, 'parsed': parsed } def create_from_yaml(self, filename: str, content: str) -> str: """ Create config from YAML content. Args: filename: Desired filename (will be sanitized) content: YAML content string Returns: Final filename (sanitized) Raises: ValueError: If content invalid or filename conflict """ # Sanitize filename filename = secure_filename(filename) # Ensure .yaml extension if not filename.endswith(('.yaml', '.yml')): filename += '.yaml' filepath = os.path.join(self.configs_dir, filename) # Check for conflicts if os.path.exists(filepath): raise ValueError(f"Config file '{filename}' already exists") # Parse and validate YAML try: parsed = yaml.safe_load(content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") # Validate config structure is_valid, error_msg = self.validate_config_content(parsed) if not is_valid: raise ValueError(f"Invalid config structure: {error_msg}") # Write file with open(filepath, 'w') as f: f.write(content) return filename def create_from_cidr( self, title: str, cidr: str, site_name: Optional[str] = None, ping_default: bool = False ) -> Tuple[str, str]: """ Create config from CIDR range. Args: title: Scan configuration title cidr: CIDR range (e.g., "10.0.0.0/24") site_name: Optional site name (defaults to "Site 1") ping_default: Default ping expectation for all IPs Returns: Tuple of (final_filename, yaml_content) Raises: ValueError: If CIDR invalid or other validation errors """ # Validate and parse CIDR try: network = ipaddress.ip_network(cidr, strict=False) except ValueError as e: raise ValueError(f"Invalid CIDR range: {str(e)}") # Check if network is too large (prevent expansion of huge ranges) if network.num_addresses > 10000: raise ValueError(f"CIDR range too large: {network.num_addresses} addresses. Maximum is 10,000.") # Expand CIDR to list of IP addresses ip_list = [str(ip) for ip in network.hosts()] # If network has only 1 address (like /32 or /128), hosts() returns empty # In that case, use the network address itself if not ip_list: ip_list = [str(network.network_address)] # Build site name if not site_name or not site_name.strip(): site_name = "Site 1" # Build IP configurations ips = [] for ip_address in ip_list: ips.append({ 'address': ip_address, 'expected': { 'ping': ping_default, 'tcp_ports': [], 'udp_ports': [] } }) # Build YAML structure config_data = { 'title': title.strip(), 'sites': [ { 'name': site_name.strip(), 'ips': ips } ] } # Convert to YAML string yaml_content = yaml.dump(config_data, sort_keys=False, default_flow_style=False) # Generate filename from title filename = self.generate_filename_from_title(title) filepath = os.path.join(self.configs_dir, filename) # Check for conflicts if os.path.exists(filepath): raise ValueError(f"Config file '{filename}' already exists") # Write file with open(filepath, 'w') as f: f.write(yaml_content) return filename, yaml_content def update_config(self, filename: str, yaml_content: str) -> None: """ Update existing config file with new YAML content. Args: filename: Config filename to update yaml_content: New YAML content string Raises: FileNotFoundError: If config doesn't exist ValueError: If YAML content is invalid """ filepath = os.path.join(self.configs_dir, filename) # Check if file exists if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Parse and validate YAML try: parsed = yaml.safe_load(yaml_content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") # Validate config structure is_valid, error_msg = self.validate_config_content(parsed) if not is_valid: raise ValueError(f"Invalid config structure: {error_msg}") # Write updated content with open(filepath, 'w') as f: f.write(yaml_content) def delete_config(self, filename: str) -> None: """ Delete config file and cascade delete any associated schedules. When a config is deleted, all schedules using that config (both enabled and disabled) are automatically deleted as well, since they would be invalid without the config file. Args: filename: Config filename to delete Raises: FileNotFoundError: If config doesn't exist """ filepath = os.path.join(self.configs_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Delete any schedules using this config (both enabled and disabled) try: from web.services.schedule_service import ScheduleService from flask import current_app # Get database session from Flask app db = current_app.db_session # Get all schedules schedule_service = ScheduleService(db) result = schedule_service.list_schedules(page=1, per_page=10000) schedules = result.get('schedules', []) # Build full path for comparison config_path = os.path.join(self.configs_dir, filename) # Find and delete all schedules using this config (enabled or disabled) deleted_schedules = [] for schedule in schedules: schedule_config = schedule.get('config_file', '') # Handle both absolute paths and just filenames if schedule_config == filename or schedule_config == config_path: schedule_id = schedule.get('id') schedule_name = schedule.get('name', 'Unknown') try: schedule_service.delete_schedule(schedule_id) deleted_schedules.append(schedule_name) except Exception as e: import logging logging.getLogger(__name__).warning( f"Failed to delete schedule {schedule_id} ('{schedule_name}'): {e}" ) if deleted_schedules: import logging logging.getLogger(__name__).info( f"Cascade deleted {len(deleted_schedules)} schedule(s) associated with config '{filename}': {', '.join(deleted_schedules)}" ) except ImportError: # If ScheduleService doesn't exist yet, skip schedule deletion pass except Exception as e: # Log error but continue with config deletion import logging logging.getLogger(__name__).error( f"Error deleting schedules for config {filename}: {e}", exc_info=True ) # Delete file os.remove(filepath) def validate_config_content(self, content: Dict) -> Tuple[bool, str]: """ Validate parsed YAML config structure. Args: content: Parsed YAML config as dict Returns: Tuple of (is_valid, error_message) """ if not isinstance(content, dict): return False, "Config must be a dictionary/object" # Check required fields if 'title' not in content: return False, "Missing required field: 'title'" if 'sites' not in content: return False, "Missing required field: 'sites'" # Validate title if not isinstance(content['title'], str) or not content['title'].strip(): return False, "Field 'title' must be a non-empty string" # Validate sites sites = content['sites'] if not isinstance(sites, list): return False, "Field 'sites' must be a list" if len(sites) == 0: return False, "Must have at least one site defined" # Validate each site for i, site in enumerate(sites): if not isinstance(site, dict): return False, f"Site {i+1} must be a dictionary/object" if 'name' not in site: return False, f"Site {i+1} missing required field: 'name'" if 'ips' not in site: return False, f"Site {i+1} missing required field: 'ips'" if not isinstance(site['ips'], list): return False, f"Site {i+1} field 'ips' must be a list" if len(site['ips']) == 0: return False, f"Site {i+1} must have at least one IP" # Validate each IP for j, ip_config in enumerate(site['ips']): if not isinstance(ip_config, dict): return False, f"Site {i+1} IP {j+1} must be a dictionary/object" if 'address' not in ip_config: return False, f"Site {i+1} IP {j+1} missing required field: 'address'" if 'expected' not in ip_config: return False, f"Site {i+1} IP {j+1} missing required field: 'expected'" if not isinstance(ip_config['expected'], dict): return False, f"Site {i+1} IP {j+1} field 'expected' must be a dictionary/object" return True, "" def get_schedules_using_config(self, filename: str) -> List[str]: """ Get list of schedule names using this config. Args: filename: Config filename Returns: List of schedule names (e.g., ["Daily Scan", "Weekly Audit"]) """ # Import here to avoid circular dependency try: from web.services.schedule_service import ScheduleService from flask import current_app # Get database session from Flask app db = current_app.db_session # Get all schedules (use large per_page to get all) schedule_service = ScheduleService(db) result = schedule_service.list_schedules(page=1, per_page=10000) # Extract schedules list from paginated result schedules = result.get('schedules', []) # Build full path for comparison config_path = os.path.join(self.configs_dir, filename) # Find schedules using this config (only enabled schedules) using_schedules = [] for schedule in schedules: schedule_config = schedule.get('config_file', '') # Handle both absolute paths and just filenames if schedule_config == filename or schedule_config == config_path: # Only count enabled schedules if schedule.get('enabled', False): using_schedules.append(schedule.get('name', 'Unknown')) return using_schedules except ImportError: # If ScheduleService doesn't exist yet, return empty list return [] except Exception as e: # If any error occurs, return empty list (safer than failing) # Log the error for debugging import logging logging.getLogger(__name__).error(f"Error getting schedules using config {filename}: {e}", exc_info=True) return [] def generate_filename_from_title(self, title: str) -> str: """ Generate safe filename from scan title. Args: title: Scan title string Returns: Safe filename (e.g., "Prod Scan 2025" -> "prod-scan-2025.yaml") """ # Convert to lowercase filename = title.lower() # Replace spaces with hyphens filename = filename.replace(' ', '-') # Remove special characters (keep only alphanumeric, hyphens, underscores) filename = re.sub(r'[^a-z0-9\-_]', '', filename) # Remove consecutive hyphens filename = re.sub(r'-+', '-', filename) # Remove leading/trailing hyphens filename = filename.strip('-') # Limit length (max 200 chars, reserve 5 for .yaml) max_length = 195 if len(filename) > max_length: filename = filename[:max_length] # Ensure not empty if not filename: filename = 'config' # Add .yaml extension filename += '.yaml' return filename def get_config_path(self, filename: str) -> str: """ Get absolute path for a config file. Args: filename: Config filename Returns: Absolute path to config file """ return os.path.join(self.configs_dir, filename) def config_exists(self, filename: str) -> bool: """ Check if a config file exists. Args: filename: Config filename Returns: True if file exists, False otherwise """ filepath = os.path.join(self.configs_dir, filename) return os.path.exists(filepath) and os.path.isfile(filepath)