""" Config Service - Business logic for config file management This service handles all operations related to scan configuration files, including creation, validation, listing, and deletion. """ import os import re import yaml import ipaddress from typing import Dict, List, Tuple, Any, Optional from datetime import datetime from pathlib import Path from werkzeug.utils import secure_filename class ConfigService: """Business logic for config management""" def __init__(self, configs_dir: str = '/app/configs'): """ Initialize the config service. Args: configs_dir: Directory where config files are stored """ self.configs_dir = configs_dir # Ensure configs directory exists os.makedirs(self.configs_dir, exist_ok=True) def list_configs(self) -> List[Dict[str, Any]]: """ List all config files with metadata. Returns: List of config metadata dictionaries: [ { "filename": "prod-scan.yaml", "title": "Prod Scan", "path": "/app/configs/prod-scan.yaml", "created_at": "2025-11-15T10:30:00Z", "size_bytes": 1234, "used_by_schedules": ["Daily Scan", "Weekly Audit"] } ] """ configs = [] # Get all YAML files in configs directory if not os.path.exists(self.configs_dir): return configs for filename in os.listdir(self.configs_dir): if not filename.endswith(('.yaml', '.yml')): continue filepath = os.path.join(self.configs_dir, filename) if not os.path.isfile(filepath): continue try: # Get file metadata stat_info = os.stat(filepath) created_at = datetime.fromtimestamp(stat_info.st_mtime).isoformat() + 'Z' size_bytes = stat_info.st_size # Parse YAML to get title title = None try: with open(filepath, 'r') as f: data = yaml.safe_load(f) if isinstance(data, dict): title = data.get('title', filename) except Exception: title = filename # Fallback to filename if parsing fails # Get schedules using this config used_by_schedules = self.get_schedules_using_config(filename) configs.append({ 'filename': filename, 'title': title, 'path': filepath, 'created_at': created_at, 'size_bytes': size_bytes, 'used_by_schedules': used_by_schedules }) except Exception as e: # Skip files that can't be read continue # Sort by created_at (most recent first) configs.sort(key=lambda x: x['created_at'], reverse=True) return configs def get_config(self, filename: str) -> Dict[str, Any]: """ Get config file content and parsed data. Args: filename: Config filename Returns: { "filename": "prod-scan.yaml", "content": "title: Prod Scan\n...", "parsed": {"title": "Prod Scan", "sites": [...]} } Raises: FileNotFoundError: If config doesn't exist ValueError: If config content is invalid """ filepath = os.path.join(self.configs_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Read file content with open(filepath, 'r') as f: content = f.read() # Parse YAML try: parsed = yaml.safe_load(content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") return { 'filename': filename, 'content': content, 'parsed': parsed } def create_from_yaml(self, filename: str, content: str) -> str: """ Create config from YAML content. Args: filename: Desired filename (will be sanitized) content: YAML content string Returns: Final filename (sanitized) Raises: ValueError: If content invalid or filename conflict """ # Sanitize filename filename = secure_filename(filename) # Ensure .yaml extension if not filename.endswith(('.yaml', '.yml')): filename += '.yaml' filepath = os.path.join(self.configs_dir, filename) # Check for conflicts if os.path.exists(filepath): raise ValueError(f"Config file '{filename}' already exists") # Parse and validate YAML try: parsed = yaml.safe_load(content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") # Validate config structure is_valid, error_msg = self.validate_config_content(parsed) if not is_valid: raise ValueError(f"Invalid config structure: {error_msg}") # Write file with open(filepath, 'w') as f: f.write(content) return filename def create_from_cidr( self, title: str, cidr: str, site_name: Optional[str] = None, ping_default: bool = False ) -> Tuple[str, str]: """ Create config from CIDR range. Args: title: Scan configuration title cidr: CIDR range (e.g., "10.0.0.0/24") site_name: Optional site name (defaults to "Site 1") ping_default: Default ping expectation for all IPs Returns: Tuple of (final_filename, yaml_content) Raises: ValueError: If CIDR invalid or other validation errors """ # Validate and parse CIDR try: network = ipaddress.ip_network(cidr, strict=False) except ValueError as e: raise ValueError(f"Invalid CIDR range: {str(e)}") # Check if network is too large (prevent expansion of huge ranges) if network.num_addresses > 10000: raise ValueError(f"CIDR range too large: {network.num_addresses} addresses. Maximum is 10,000.") # Expand CIDR to list of IP addresses ip_list = [str(ip) for ip in network.hosts()] # If network has only 1 address (like /32 or /128), hosts() returns empty # In that case, use the network address itself if not ip_list: ip_list = [str(network.network_address)] # Build site name if not site_name or not site_name.strip(): site_name = "Site 1" # Build IP configurations ips = [] for ip_address in ip_list: ips.append({ 'address': ip_address, 'expected': { 'ping': ping_default, 'tcp_ports': [], 'udp_ports': [] } }) # Build YAML structure config_data = { 'title': title.strip(), 'sites': [ { 'name': site_name.strip(), 'ips': ips } ] } # Convert to YAML string yaml_content = yaml.dump(config_data, sort_keys=False, default_flow_style=False) # Generate filename from title filename = self.generate_filename_from_title(title) filepath = os.path.join(self.configs_dir, filename) # Check for conflicts if os.path.exists(filepath): raise ValueError(f"Config file '{filename}' already exists") # Write file with open(filepath, 'w') as f: f.write(yaml_content) return filename, yaml_content def update_config(self, filename: str, yaml_content: str) -> None: """ Update existing config file with new YAML content. Args: filename: Config filename to update yaml_content: New YAML content string Raises: FileNotFoundError: If config doesn't exist ValueError: If YAML content is invalid """ filepath = os.path.join(self.configs_dir, filename) # Check if file exists if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Parse and validate YAML try: parsed = yaml.safe_load(yaml_content) except yaml.YAMLError as e: raise ValueError(f"Invalid YAML syntax: {str(e)}") # Validate config structure is_valid, error_msg = self.validate_config_content(parsed) if not is_valid: raise ValueError(f"Invalid config structure: {error_msg}") # Write updated content with open(filepath, 'w') as f: f.write(yaml_content) def delete_config(self, filename: str) -> None: """ Delete config file if not used by schedules. Args: filename: Config filename to delete Raises: FileNotFoundError: If config doesn't exist ValueError: If config used by active schedules """ filepath = os.path.join(self.configs_dir, filename) if not os.path.exists(filepath): raise FileNotFoundError(f"Config file '{filename}' not found") # Check if used by schedules schedules = self.get_schedules_using_config(filename) if schedules: schedule_list = ', '.join(schedules) raise ValueError( f"Cannot delete config '{filename}' because it is used by the following schedules: {schedule_list}" ) # Delete file os.remove(filepath) def validate_config_content(self, content: Dict) -> Tuple[bool, str]: """ Validate parsed YAML config structure. Args: content: Parsed YAML config as dict Returns: Tuple of (is_valid, error_message) """ if not isinstance(content, dict): return False, "Config must be a dictionary/object" # Check required fields if 'title' not in content: return False, "Missing required field: 'title'" if 'sites' not in content: return False, "Missing required field: 'sites'" # Validate title if not isinstance(content['title'], str) or not content['title'].strip(): return False, "Field 'title' must be a non-empty string" # Validate sites sites = content['sites'] if not isinstance(sites, list): return False, "Field 'sites' must be a list" if len(sites) == 0: return False, "Must have at least one site defined" # Validate each site for i, site in enumerate(sites): if not isinstance(site, dict): return False, f"Site {i+1} must be a dictionary/object" if 'name' not in site: return False, f"Site {i+1} missing required field: 'name'" if 'ips' not in site: return False, f"Site {i+1} missing required field: 'ips'" if not isinstance(site['ips'], list): return False, f"Site {i+1} field 'ips' must be a list" if len(site['ips']) == 0: return False, f"Site {i+1} must have at least one IP" # Validate each IP for j, ip_config in enumerate(site['ips']): if not isinstance(ip_config, dict): return False, f"Site {i+1} IP {j+1} must be a dictionary/object" if 'address' not in ip_config: return False, f"Site {i+1} IP {j+1} missing required field: 'address'" if 'expected' not in ip_config: return False, f"Site {i+1} IP {j+1} missing required field: 'expected'" if not isinstance(ip_config['expected'], dict): return False, f"Site {i+1} IP {j+1} field 'expected' must be a dictionary/object" return True, "" def get_schedules_using_config(self, filename: str) -> List[str]: """ Get list of schedule names using this config. Args: filename: Config filename Returns: List of schedule names (e.g., ["Daily Scan", "Weekly Audit"]) """ # Import here to avoid circular dependency try: from web.services.schedule_service import ScheduleService schedule_service = ScheduleService() # Get all schedules schedules = schedule_service.list_schedules() # Build full path for comparison config_path = os.path.join(self.configs_dir, filename) # Find schedules using this config using_schedules = [] for schedule in schedules: schedule_config = schedule.get('config_file', '') # Handle both absolute paths and just filenames if schedule_config == filename or schedule_config == config_path: using_schedules.append(schedule.get('name', 'Unknown')) return using_schedules except ImportError: # If ScheduleService doesn't exist yet, return empty list return [] except Exception: # If any error occurs, return empty list (safer than failing) return [] def generate_filename_from_title(self, title: str) -> str: """ Generate safe filename from scan title. Args: title: Scan title string Returns: Safe filename (e.g., "Prod Scan 2025" -> "prod-scan-2025.yaml") """ # Convert to lowercase filename = title.lower() # Replace spaces with hyphens filename = filename.replace(' ', '-') # Remove special characters (keep only alphanumeric, hyphens, underscores) filename = re.sub(r'[^a-z0-9\-_]', '', filename) # Remove consecutive hyphens filename = re.sub(r'-+', '-', filename) # Remove leading/trailing hyphens filename = filename.strip('-') # Limit length (max 200 chars, reserve 5 for .yaml) max_length = 195 if len(filename) > max_length: filename = filename[:max_length] # Ensure not empty if not filename: filename = 'config' # Add .yaml extension filename += '.yaml' return filename def get_config_path(self, filename: str) -> str: """ Get absolute path for a config file. Args: filename: Config filename Returns: Absolute path to config file """ return os.path.join(self.configs_dir, filename) def config_exists(self, filename: str) -> bool: """ Check if a config file exists. Args: filename: Config filename Returns: True if file exists, False otherwise """ filepath = os.path.join(self.configs_dir, filename) return os.path.exists(filepath) and os.path.isfile(filepath)