340 lines
11 KiB
Python
340 lines
11 KiB
Python
"""
|
|
Config Service - Business logic for config management
|
|
|
|
This service handles all operations related to scan configurations,
|
|
both database-stored (primary) and file-based (deprecated).
|
|
"""
|
|
|
|
import os
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
class ConfigService:
|
|
"""Business logic for config management"""
|
|
|
|
def __init__(self, db_session: Session = None, configs_dir: str = '/app/configs'):
|
|
"""
|
|
Initialize the config service.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy database session (for database operations)
|
|
configs_dir: Directory where legacy config files are stored
|
|
"""
|
|
self.db = db_session
|
|
self.configs_dir = configs_dir
|
|
|
|
# Ensure configs directory exists (for legacy YAML configs)
|
|
os.makedirs(self.configs_dir, exist_ok=True)
|
|
|
|
# ============================================================================
|
|
# Database-based Config Operations (Primary)
|
|
# ============================================================================
|
|
|
|
def create_config(self, title: str, description: Optional[str], site_ids: List[int]) -> Dict[str, Any]:
|
|
"""
|
|
Create a new scan configuration in the database.
|
|
|
|
Args:
|
|
title: Configuration title
|
|
description: Optional configuration description
|
|
site_ids: List of site IDs to include in this config
|
|
|
|
Returns:
|
|
Created config as dictionary:
|
|
{
|
|
"id": 1,
|
|
"title": "Production Scan",
|
|
"description": "...",
|
|
"site_count": 3,
|
|
"sites": [...],
|
|
"created_at": "2025-11-19T10:30:00Z",
|
|
"updated_at": "2025-11-19T10:30:00Z"
|
|
}
|
|
|
|
Raises:
|
|
ValueError: If validation fails or sites don't exist
|
|
"""
|
|
if not title or not title.strip():
|
|
raise ValueError("Title is required")
|
|
|
|
if not site_ids or len(site_ids) == 0:
|
|
raise ValueError("At least one site must be selected")
|
|
|
|
# Import models here to avoid circular imports
|
|
from web.models import ScanConfig, ScanConfigSite, Site
|
|
|
|
# Verify all sites exist
|
|
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
|
|
if len(existing_sites) != len(site_ids):
|
|
found_ids = {s.id for s in existing_sites}
|
|
missing_ids = set(site_ids) - found_ids
|
|
raise ValueError(f"Sites not found: {missing_ids}")
|
|
|
|
# Create config
|
|
config = ScanConfig(
|
|
title=title.strip(),
|
|
description=description.strip() if description else None,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow()
|
|
)
|
|
|
|
self.db.add(config)
|
|
self.db.flush() # Get the config ID
|
|
|
|
# Create associations
|
|
for site_id in site_ids:
|
|
assoc = ScanConfigSite(
|
|
config_id=config.id,
|
|
site_id=site_id,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
self.db.add(assoc)
|
|
|
|
self.db.commit()
|
|
|
|
return self.get_config_by_id(config.id)
|
|
|
|
def get_config_by_id(self, config_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Get a scan configuration by ID.
|
|
|
|
Args:
|
|
config_id: Configuration ID
|
|
|
|
Returns:
|
|
Config as dictionary with sites
|
|
|
|
Raises:
|
|
ValueError: If config not found
|
|
"""
|
|
from web.models import ScanConfig
|
|
|
|
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
|
|
|
|
if not config:
|
|
raise ValueError(f"Config with ID {config_id} not found")
|
|
|
|
# Get associated sites
|
|
sites = []
|
|
for assoc in config.site_associations:
|
|
site = assoc.site
|
|
sites.append({
|
|
'id': site.id,
|
|
'name': site.name,
|
|
'description': site.description,
|
|
'ip_count': len(site.ips)
|
|
})
|
|
|
|
return {
|
|
'id': config.id,
|
|
'title': config.title,
|
|
'description': config.description,
|
|
'site_count': len(sites),
|
|
'sites': sites,
|
|
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
|
|
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
|
|
}
|
|
|
|
def list_configs_db(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
List all scan configurations from database.
|
|
|
|
Returns:
|
|
List of config dictionaries with metadata
|
|
"""
|
|
from web.models import ScanConfig
|
|
|
|
configs = self.db.query(ScanConfig).order_by(ScanConfig.updated_at.desc()).all()
|
|
|
|
result = []
|
|
for config in configs:
|
|
sites = []
|
|
for assoc in config.site_associations:
|
|
site = assoc.site
|
|
sites.append({
|
|
'id': site.id,
|
|
'name': site.name
|
|
})
|
|
|
|
result.append({
|
|
'id': config.id,
|
|
'title': config.title,
|
|
'description': config.description,
|
|
'site_count': len(sites),
|
|
'sites': sites,
|
|
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
|
|
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
|
|
})
|
|
|
|
return result
|
|
|
|
def update_config(self, config_id: int, title: Optional[str], description: Optional[str], site_ids: Optional[List[int]]) -> Dict[str, Any]:
|
|
"""
|
|
Update a scan configuration.
|
|
|
|
Args:
|
|
config_id: Configuration ID to update
|
|
title: New title (optional)
|
|
description: New description (optional)
|
|
site_ids: New list of site IDs (optional, replaces existing)
|
|
|
|
Returns:
|
|
Updated config dictionary
|
|
|
|
Raises:
|
|
ValueError: If config not found or validation fails
|
|
"""
|
|
from web.models import ScanConfig, ScanConfigSite, Site
|
|
|
|
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
|
|
|
|
if not config:
|
|
raise ValueError(f"Config with ID {config_id} not found")
|
|
|
|
# Update fields if provided
|
|
if title is not None:
|
|
if not title.strip():
|
|
raise ValueError("Title cannot be empty")
|
|
config.title = title.strip()
|
|
|
|
if description is not None:
|
|
config.description = description.strip() if description.strip() else None
|
|
|
|
# Update sites if provided
|
|
if site_ids is not None:
|
|
if len(site_ids) == 0:
|
|
raise ValueError("At least one site must be selected")
|
|
|
|
# Verify all sites exist
|
|
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
|
|
if len(existing_sites) != len(site_ids):
|
|
found_ids = {s.id for s in existing_sites}
|
|
missing_ids = set(site_ids) - found_ids
|
|
raise ValueError(f"Sites not found: {missing_ids}")
|
|
|
|
# Remove existing associations
|
|
self.db.query(ScanConfigSite).filter_by(config_id=config_id).delete()
|
|
|
|
# Create new associations
|
|
for site_id in site_ids:
|
|
assoc = ScanConfigSite(
|
|
config_id=config_id,
|
|
site_id=site_id,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
self.db.add(assoc)
|
|
|
|
config.updated_at = datetime.utcnow()
|
|
self.db.commit()
|
|
|
|
return self.get_config_by_id(config_id)
|
|
|
|
def delete_config(self, config_id: int) -> None:
|
|
"""
|
|
Delete a scan configuration from database.
|
|
|
|
This will cascade delete associated ScanConfigSite records.
|
|
Schedules and scans referencing this config will have their
|
|
config_id set to NULL.
|
|
|
|
Args:
|
|
config_id: Configuration ID to delete
|
|
|
|
Raises:
|
|
ValueError: If config not found
|
|
"""
|
|
from web.models import ScanConfig
|
|
|
|
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
|
|
|
|
if not config:
|
|
raise ValueError(f"Config with ID {config_id} not found")
|
|
|
|
self.db.delete(config)
|
|
self.db.commit()
|
|
|
|
def add_site_to_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Add a site to an existing config.
|
|
|
|
Args:
|
|
config_id: Configuration ID
|
|
site_id: Site ID to add
|
|
|
|
Returns:
|
|
Updated config dictionary
|
|
|
|
Raises:
|
|
ValueError: If config or site not found, or association already exists
|
|
"""
|
|
from web.models import ScanConfig, Site, ScanConfigSite
|
|
|
|
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
|
|
if not config:
|
|
raise ValueError(f"Config with ID {config_id} not found")
|
|
|
|
site = self.db.query(Site).filter_by(id=site_id).first()
|
|
if not site:
|
|
raise ValueError(f"Site with ID {site_id} not found")
|
|
|
|
# Check if association already exists
|
|
existing = self.db.query(ScanConfigSite).filter_by(
|
|
config_id=config_id, site_id=site_id
|
|
).first()
|
|
|
|
if existing:
|
|
raise ValueError(f"Site '{site.name}' is already in this config")
|
|
|
|
# Create association
|
|
assoc = ScanConfigSite(
|
|
config_id=config_id,
|
|
site_id=site_id,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
self.db.add(assoc)
|
|
|
|
config.updated_at = datetime.utcnow()
|
|
self.db.commit()
|
|
|
|
return self.get_config_by_id(config_id)
|
|
|
|
def remove_site_from_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Remove a site from a config.
|
|
|
|
Args:
|
|
config_id: Configuration ID
|
|
site_id: Site ID to remove
|
|
|
|
Returns:
|
|
Updated config dictionary
|
|
|
|
Raises:
|
|
ValueError: If config not found, or removing would leave config empty
|
|
"""
|
|
from web.models import ScanConfig, ScanConfigSite
|
|
|
|
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
|
|
if not config:
|
|
raise ValueError(f"Config with ID {config_id} not found")
|
|
|
|
# Check if this would leave the config empty
|
|
current_site_count = len(config.site_associations)
|
|
if current_site_count <= 1:
|
|
raise ValueError("Cannot remove last site from config. Delete the config instead.")
|
|
|
|
# Remove association
|
|
deleted = self.db.query(ScanConfigSite).filter_by(
|
|
config_id=config_id, site_id=site_id
|
|
).delete()
|
|
|
|
if deleted == 0:
|
|
raise ValueError(f"Site with ID {site_id} is not in this config")
|
|
|
|
config.updated_at = datetime.utcnow()
|
|
self.db.commit()
|
|
|
|
return self.get_config_by_id(config_id)
|