""" Config Service - Business logic for config management This service handles all operations related to scan configurations, both database-stored (primary) and file-based (deprecated). """ import os from typing import Dict, List, Any, Optional from datetime import datetime from sqlalchemy.orm import Session class ConfigService: """Business logic for config management""" def __init__(self, db_session: Session = None, configs_dir: str = '/app/configs'): """ Initialize the config service. Args: db_session: SQLAlchemy database session (for database operations) configs_dir: Directory where legacy config files are stored """ self.db = db_session self.configs_dir = configs_dir # Ensure configs directory exists (for legacy YAML configs) os.makedirs(self.configs_dir, exist_ok=True) # ============================================================================ # Database-based Config Operations (Primary) # ============================================================================ def create_config(self, title: str, description: Optional[str], site_ids: List[int]) -> Dict[str, Any]: """ Create a new scan configuration in the database. Args: title: Configuration title description: Optional configuration description site_ids: List of site IDs to include in this config Returns: Created config as dictionary: { "id": 1, "title": "Production Scan", "description": "...", "site_count": 3, "sites": [...], "created_at": "2025-11-19T10:30:00Z", "updated_at": "2025-11-19T10:30:00Z" } Raises: ValueError: If validation fails or sites don't exist """ if not title or not title.strip(): raise ValueError("Title is required") if not site_ids or len(site_ids) == 0: raise ValueError("At least one site must be selected") # Import models here to avoid circular imports from web.models import ScanConfig, ScanConfigSite, Site # Verify all sites exist existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all() if len(existing_sites) != len(site_ids): found_ids = {s.id for s in existing_sites} missing_ids = set(site_ids) - found_ids raise ValueError(f"Sites not found: {missing_ids}") # Create config config = ScanConfig( title=title.strip(), description=description.strip() if description else None, created_at=datetime.utcnow(), updated_at=datetime.utcnow() ) self.db.add(config) self.db.flush() # Get the config ID # Create associations for site_id in site_ids: assoc = ScanConfigSite( config_id=config.id, site_id=site_id, created_at=datetime.utcnow() ) self.db.add(assoc) self.db.commit() return self.get_config_by_id(config.id) def get_config_by_id(self, config_id: int) -> Dict[str, Any]: """ Get a scan configuration by ID. Args: config_id: Configuration ID Returns: Config as dictionary with sites Raises: ValueError: If config not found """ from web.models import ScanConfig config = self.db.query(ScanConfig).filter_by(id=config_id).first() if not config: raise ValueError(f"Config with ID {config_id} not found") # Get associated sites sites = [] for assoc in config.site_associations: site = assoc.site sites.append({ 'id': site.id, 'name': site.name, 'description': site.description, 'ip_count': len(site.ips) }) return { 'id': config.id, 'title': config.title, 'description': config.description, 'site_count': len(sites), 'sites': sites, 'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None, 'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None } def list_configs_db(self) -> List[Dict[str, Any]]: """ List all scan configurations from database. Returns: List of config dictionaries with metadata """ from web.models import ScanConfig configs = self.db.query(ScanConfig).order_by(ScanConfig.updated_at.desc()).all() result = [] for config in configs: sites = [] for assoc in config.site_associations: site = assoc.site sites.append({ 'id': site.id, 'name': site.name }) result.append({ 'id': config.id, 'title': config.title, 'description': config.description, 'site_count': len(sites), 'sites': sites, 'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None, 'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None }) return result def update_config(self, config_id: int, title: Optional[str], description: Optional[str], site_ids: Optional[List[int]]) -> Dict[str, Any]: """ Update a scan configuration. Args: config_id: Configuration ID to update title: New title (optional) description: New description (optional) site_ids: New list of site IDs (optional, replaces existing) Returns: Updated config dictionary Raises: ValueError: If config not found or validation fails """ from web.models import ScanConfig, ScanConfigSite, Site config = self.db.query(ScanConfig).filter_by(id=config_id).first() if not config: raise ValueError(f"Config with ID {config_id} not found") # Update fields if provided if title is not None: if not title.strip(): raise ValueError("Title cannot be empty") config.title = title.strip() if description is not None: config.description = description.strip() if description.strip() else None # Update sites if provided if site_ids is not None: if len(site_ids) == 0: raise ValueError("At least one site must be selected") # Verify all sites exist existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all() if len(existing_sites) != len(site_ids): found_ids = {s.id for s in existing_sites} missing_ids = set(site_ids) - found_ids raise ValueError(f"Sites not found: {missing_ids}") # Remove existing associations self.db.query(ScanConfigSite).filter_by(config_id=config_id).delete() # Create new associations for site_id in site_ids: assoc = ScanConfigSite( config_id=config_id, site_id=site_id, created_at=datetime.utcnow() ) self.db.add(assoc) config.updated_at = datetime.utcnow() self.db.commit() return self.get_config_by_id(config_id) def delete_config(self, config_id: int) -> None: """ Delete a scan configuration from database. This will cascade delete associated ScanConfigSite records. Schedules and scans referencing this config will have their config_id set to NULL. Args: config_id: Configuration ID to delete Raises: ValueError: If config not found """ from web.models import ScanConfig config = self.db.query(ScanConfig).filter_by(id=config_id).first() if not config: raise ValueError(f"Config with ID {config_id} not found") self.db.delete(config) self.db.commit() def add_site_to_config(self, config_id: int, site_id: int) -> Dict[str, Any]: """ Add a site to an existing config. Args: config_id: Configuration ID site_id: Site ID to add Returns: Updated config dictionary Raises: ValueError: If config or site not found, or association already exists """ from web.models import ScanConfig, Site, ScanConfigSite config = self.db.query(ScanConfig).filter_by(id=config_id).first() if not config: raise ValueError(f"Config with ID {config_id} not found") site = self.db.query(Site).filter_by(id=site_id).first() if not site: raise ValueError(f"Site with ID {site_id} not found") # Check if association already exists existing = self.db.query(ScanConfigSite).filter_by( config_id=config_id, site_id=site_id ).first() if existing: raise ValueError(f"Site '{site.name}' is already in this config") # Create association assoc = ScanConfigSite( config_id=config_id, site_id=site_id, created_at=datetime.utcnow() ) self.db.add(assoc) config.updated_at = datetime.utcnow() self.db.commit() return self.get_config_by_id(config_id) def remove_site_from_config(self, config_id: int, site_id: int) -> Dict[str, Any]: """ Remove a site from a config. Args: config_id: Configuration ID site_id: Site ID to remove Returns: Updated config dictionary Raises: ValueError: If config not found, or removing would leave config empty """ from web.models import ScanConfig, ScanConfigSite config = self.db.query(ScanConfig).filter_by(id=config_id).first() if not config: raise ValueError(f"Config with ID {config_id} not found") # Check if this would leave the config empty current_site_count = len(config.site_associations) if current_site_count <= 1: raise ValueError("Cannot remove last site from config. Delete the config instead.") # Remove association deleted = self.db.query(ScanConfigSite).filter_by( config_id=config_id, site_id=site_id ).delete() if deleted == 0: raise ValueError(f"Site with ID {site_id} is not in this config") config.updated_at = datetime.utcnow() self.db.commit() return self.get_config_by_id(config_id)