Files
SneakyScan/app/web/services/config_service.py

340 lines
11 KiB
Python

"""
Config Service - Business logic for config management
This service handles all operations related to scan configurations,
both database-stored (primary) and file-based (deprecated).
"""
import os
from typing import Dict, List, Any, Optional
from datetime import datetime
from sqlalchemy.orm import Session
class ConfigService:
"""Business logic for config management"""
def __init__(self, db_session: Session = None, configs_dir: str = '/app/configs'):
"""
Initialize the config service.
Args:
db_session: SQLAlchemy database session (for database operations)
configs_dir: Directory where legacy config files are stored
"""
self.db = db_session
self.configs_dir = configs_dir
# Ensure configs directory exists (for legacy YAML configs)
os.makedirs(self.configs_dir, exist_ok=True)
# ============================================================================
# Database-based Config Operations (Primary)
# ============================================================================
def create_config(self, title: str, description: Optional[str], site_ids: List[int]) -> Dict[str, Any]:
"""
Create a new scan configuration in the database.
Args:
title: Configuration title
description: Optional configuration description
site_ids: List of site IDs to include in this config
Returns:
Created config as dictionary:
{
"id": 1,
"title": "Production Scan",
"description": "...",
"site_count": 3,
"sites": [...],
"created_at": "2025-11-19T10:30:00Z",
"updated_at": "2025-11-19T10:30:00Z"
}
Raises:
ValueError: If validation fails or sites don't exist
"""
if not title or not title.strip():
raise ValueError("Title is required")
if not site_ids or len(site_ids) == 0:
raise ValueError("At least one site must be selected")
# Import models here to avoid circular imports
from web.models import ScanConfig, ScanConfigSite, Site
# Verify all sites exist
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
if len(existing_sites) != len(site_ids):
found_ids = {s.id for s in existing_sites}
missing_ids = set(site_ids) - found_ids
raise ValueError(f"Sites not found: {missing_ids}")
# Create config
config = ScanConfig(
title=title.strip(),
description=description.strip() if description else None,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.db.add(config)
self.db.flush() # Get the config ID
# Create associations
for site_id in site_ids:
assoc = ScanConfigSite(
config_id=config.id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
self.db.commit()
return self.get_config_by_id(config.id)
def get_config_by_id(self, config_id: int) -> Dict[str, Any]:
"""
Get a scan configuration by ID.
Args:
config_id: Configuration ID
Returns:
Config as dictionary with sites
Raises:
ValueError: If config not found
"""
from web.models import ScanConfig
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Get associated sites
sites = []
for assoc in config.site_associations:
site = assoc.site
sites.append({
'id': site.id,
'name': site.name,
'description': site.description,
'ip_count': len(site.ips)
})
return {
'id': config.id,
'title': config.title,
'description': config.description,
'site_count': len(sites),
'sites': sites,
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
}
def list_configs_db(self) -> List[Dict[str, Any]]:
"""
List all scan configurations from database.
Returns:
List of config dictionaries with metadata
"""
from web.models import ScanConfig
configs = self.db.query(ScanConfig).order_by(ScanConfig.updated_at.desc()).all()
result = []
for config in configs:
sites = []
for assoc in config.site_associations:
site = assoc.site
sites.append({
'id': site.id,
'name': site.name
})
result.append({
'id': config.id,
'title': config.title,
'description': config.description,
'site_count': len(sites),
'sites': sites,
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
})
return result
def update_config(self, config_id: int, title: Optional[str], description: Optional[str], site_ids: Optional[List[int]]) -> Dict[str, Any]:
"""
Update a scan configuration.
Args:
config_id: Configuration ID to update
title: New title (optional)
description: New description (optional)
site_ids: New list of site IDs (optional, replaces existing)
Returns:
Updated config dictionary
Raises:
ValueError: If config not found or validation fails
"""
from web.models import ScanConfig, ScanConfigSite, Site
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Update fields if provided
if title is not None:
if not title.strip():
raise ValueError("Title cannot be empty")
config.title = title.strip()
if description is not None:
config.description = description.strip() if description.strip() else None
# Update sites if provided
if site_ids is not None:
if len(site_ids) == 0:
raise ValueError("At least one site must be selected")
# Verify all sites exist
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
if len(existing_sites) != len(site_ids):
found_ids = {s.id for s in existing_sites}
missing_ids = set(site_ids) - found_ids
raise ValueError(f"Sites not found: {missing_ids}")
# Remove existing associations
self.db.query(ScanConfigSite).filter_by(config_id=config_id).delete()
# Create new associations
for site_id in site_ids:
assoc = ScanConfigSite(
config_id=config_id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)
def delete_config(self, config_id: int) -> None:
"""
Delete a scan configuration from database.
This will cascade delete associated ScanConfigSite records.
Schedules and scans referencing this config will have their
config_id set to NULL.
Args:
config_id: Configuration ID to delete
Raises:
ValueError: If config not found
"""
from web.models import ScanConfig
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
self.db.delete(config)
self.db.commit()
def add_site_to_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
"""
Add a site to an existing config.
Args:
config_id: Configuration ID
site_id: Site ID to add
Returns:
Updated config dictionary
Raises:
ValueError: If config or site not found, or association already exists
"""
from web.models import ScanConfig, Site, ScanConfigSite
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
site = self.db.query(Site).filter_by(id=site_id).first()
if not site:
raise ValueError(f"Site with ID {site_id} not found")
# Check if association already exists
existing = self.db.query(ScanConfigSite).filter_by(
config_id=config_id, site_id=site_id
).first()
if existing:
raise ValueError(f"Site '{site.name}' is already in this config")
# Create association
assoc = ScanConfigSite(
config_id=config_id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)
def remove_site_from_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
"""
Remove a site from a config.
Args:
config_id: Configuration ID
site_id: Site ID to remove
Returns:
Updated config dictionary
Raises:
ValueError: If config not found, or removing would leave config empty
"""
from web.models import ScanConfig, ScanConfigSite
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Check if this would leave the config empty
current_site_count = len(config.site_associations)
if current_site_count <= 1:
raise ValueError("Cannot remove last site from config. Delete the config instead.")
# Remove association
deleted = self.db.query(ScanConfigSite).filter_by(
config_id=config_id, site_id=site_id
).delete()
if deleted == 0:
raise ValueError(f"Site with ID {site_id} is not in this config")
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)