Files
SneakyScan/app/web/services/config_service.py

968 lines
32 KiB
Python

"""
Config Service - Business logic for config management
This service handles all operations related to scan configurations,
both database-stored (primary) and file-based (deprecated).
"""
import os
import re
import yaml
import ipaddress
from typing import Dict, List, Tuple, Any, Optional
from datetime import datetime
from pathlib import Path
from werkzeug.utils import secure_filename
from sqlalchemy.orm import Session
class ConfigService:
"""Business logic for config management"""
def __init__(self, db_session: Session = None, configs_dir: str = '/app/configs'):
"""
Initialize the config service.
Args:
db_session: SQLAlchemy database session (for database operations)
configs_dir: Directory where legacy config files are stored
"""
self.db = db_session
self.configs_dir = configs_dir
# Ensure configs directory exists (for legacy YAML configs)
os.makedirs(self.configs_dir, exist_ok=True)
# ============================================================================
# Database-based Config Operations (Primary)
# ============================================================================
def create_config(self, title: str, description: Optional[str], site_ids: List[int]) -> Dict[str, Any]:
"""
Create a new scan configuration in the database.
Args:
title: Configuration title
description: Optional configuration description
site_ids: List of site IDs to include in this config
Returns:
Created config as dictionary:
{
"id": 1,
"title": "Production Scan",
"description": "...",
"site_count": 3,
"sites": [...],
"created_at": "2025-11-19T10:30:00Z",
"updated_at": "2025-11-19T10:30:00Z"
}
Raises:
ValueError: If validation fails or sites don't exist
"""
if not title or not title.strip():
raise ValueError("Title is required")
if not site_ids or len(site_ids) == 0:
raise ValueError("At least one site must be selected")
# Import models here to avoid circular imports
from web.models import ScanConfig, ScanConfigSite, Site
# Verify all sites exist
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
if len(existing_sites) != len(site_ids):
found_ids = {s.id for s in existing_sites}
missing_ids = set(site_ids) - found_ids
raise ValueError(f"Sites not found: {missing_ids}")
# Create config
config = ScanConfig(
title=title.strip(),
description=description.strip() if description else None,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
self.db.add(config)
self.db.flush() # Get the config ID
# Create associations
for site_id in site_ids:
assoc = ScanConfigSite(
config_id=config.id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
self.db.commit()
return self.get_config_by_id(config.id)
def get_config_by_id(self, config_id: int) -> Dict[str, Any]:
"""
Get a scan configuration by ID.
Args:
config_id: Configuration ID
Returns:
Config as dictionary with sites
Raises:
ValueError: If config not found
"""
from web.models import ScanConfig
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Get associated sites
sites = []
for assoc in config.site_associations:
site = assoc.site
sites.append({
'id': site.id,
'name': site.name,
'description': site.description,
'ip_count': len(site.ips)
})
return {
'id': config.id,
'title': config.title,
'description': config.description,
'site_count': len(sites),
'sites': sites,
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
}
def list_configs_db(self) -> List[Dict[str, Any]]:
"""
List all scan configurations from database.
Returns:
List of config dictionaries with metadata
"""
from web.models import ScanConfig
configs = self.db.query(ScanConfig).order_by(ScanConfig.updated_at.desc()).all()
result = []
for config in configs:
sites = []
for assoc in config.site_associations:
site = assoc.site
sites.append({
'id': site.id,
'name': site.name
})
result.append({
'id': config.id,
'title': config.title,
'description': config.description,
'site_count': len(sites),
'sites': sites,
'created_at': config.created_at.isoformat() + 'Z' if config.created_at else None,
'updated_at': config.updated_at.isoformat() + 'Z' if config.updated_at else None
})
return result
def update_config(self, config_id: int, title: Optional[str], description: Optional[str], site_ids: Optional[List[int]]) -> Dict[str, Any]:
"""
Update a scan configuration.
Args:
config_id: Configuration ID to update
title: New title (optional)
description: New description (optional)
site_ids: New list of site IDs (optional, replaces existing)
Returns:
Updated config dictionary
Raises:
ValueError: If config not found or validation fails
"""
from web.models import ScanConfig, ScanConfigSite, Site
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Update fields if provided
if title is not None:
if not title.strip():
raise ValueError("Title cannot be empty")
config.title = title.strip()
if description is not None:
config.description = description.strip() if description.strip() else None
# Update sites if provided
if site_ids is not None:
if len(site_ids) == 0:
raise ValueError("At least one site must be selected")
# Verify all sites exist
existing_sites = self.db.query(Site).filter(Site.id.in_(site_ids)).all()
if len(existing_sites) != len(site_ids):
found_ids = {s.id for s in existing_sites}
missing_ids = set(site_ids) - found_ids
raise ValueError(f"Sites not found: {missing_ids}")
# Remove existing associations
self.db.query(ScanConfigSite).filter_by(config_id=config_id).delete()
# Create new associations
for site_id in site_ids:
assoc = ScanConfigSite(
config_id=config_id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)
def delete_config(self, config_id: int) -> None:
"""
Delete a scan configuration from database.
This will cascade delete associated ScanConfigSite records.
Schedules and scans referencing this config will have their
config_id set to NULL.
Args:
config_id: Configuration ID to delete
Raises:
ValueError: If config not found
"""
from web.models import ScanConfig
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
self.db.delete(config)
self.db.commit()
def add_site_to_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
"""
Add a site to an existing config.
Args:
config_id: Configuration ID
site_id: Site ID to add
Returns:
Updated config dictionary
Raises:
ValueError: If config or site not found, or association already exists
"""
from web.models import ScanConfig, Site, ScanConfigSite
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
site = self.db.query(Site).filter_by(id=site_id).first()
if not site:
raise ValueError(f"Site with ID {site_id} not found")
# Check if association already exists
existing = self.db.query(ScanConfigSite).filter_by(
config_id=config_id, site_id=site_id
).first()
if existing:
raise ValueError(f"Site '{site.name}' is already in this config")
# Create association
assoc = ScanConfigSite(
config_id=config_id,
site_id=site_id,
created_at=datetime.utcnow()
)
self.db.add(assoc)
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)
def remove_site_from_config(self, config_id: int, site_id: int) -> Dict[str, Any]:
"""
Remove a site from a config.
Args:
config_id: Configuration ID
site_id: Site ID to remove
Returns:
Updated config dictionary
Raises:
ValueError: If config not found, or removing would leave config empty
"""
from web.models import ScanConfig, ScanConfigSite
config = self.db.query(ScanConfig).filter_by(id=config_id).first()
if not config:
raise ValueError(f"Config with ID {config_id} not found")
# Check if this would leave the config empty
current_site_count = len(config.site_associations)
if current_site_count <= 1:
raise ValueError("Cannot remove last site from config. Delete the config instead.")
# Remove association
deleted = self.db.query(ScanConfigSite).filter_by(
config_id=config_id, site_id=site_id
).delete()
if deleted == 0:
raise ValueError(f"Site with ID {site_id} is not in this config")
config.updated_at = datetime.utcnow()
self.db.commit()
return self.get_config_by_id(config_id)
# ============================================================================
# Legacy YAML File Operations (Deprecated)
# ============================================================================
def list_configs_file(self) -> List[Dict[str, Any]]:
"""
[DEPRECATED] List all config files with metadata.
Returns:
List of config metadata dictionaries:
[
{
"filename": "prod-scan.yaml",
"title": "Prod Scan",
"path": "/app/configs/prod-scan.yaml",
"created_at": "2025-11-15T10:30:00Z",
"size_bytes": 1234,
"used_by_schedules": ["Daily Scan", "Weekly Audit"]
}
]
"""
configs = []
# Get all YAML files in configs directory
if not os.path.exists(self.configs_dir):
return configs
for filename in os.listdir(self.configs_dir):
if not filename.endswith(('.yaml', '.yml')):
continue
filepath = os.path.join(self.configs_dir, filename)
if not os.path.isfile(filepath):
continue
try:
# Get file metadata
stat_info = os.stat(filepath)
created_at = datetime.fromtimestamp(stat_info.st_mtime).isoformat() + 'Z'
size_bytes = stat_info.st_size
# Parse YAML to get title
title = None
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if isinstance(data, dict):
title = data.get('title', filename)
except Exception:
title = filename # Fallback to filename if parsing fails
# Get schedules using this config
used_by_schedules = self.get_schedules_using_config(filename)
configs.append({
'filename': filename,
'title': title,
'path': filepath,
'created_at': created_at,
'size_bytes': size_bytes,
'used_by_schedules': used_by_schedules
})
except Exception as e:
# Skip files that can't be read
continue
# Sort by created_at (most recent first)
configs.sort(key=lambda x: x['created_at'], reverse=True)
return configs
def get_config(self, filename: str) -> Dict[str, Any]:
"""
Get config file content and parsed data.
Args:
filename: Config filename
Returns:
{
"filename": "prod-scan.yaml",
"content": "title: Prod Scan\n...",
"parsed": {"title": "Prod Scan", "sites": [...]}
}
Raises:
FileNotFoundError: If config doesn't exist
ValueError: If config content is invalid
"""
filepath = os.path.join(self.configs_dir, filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f"Config file '{filename}' not found")
# Read file content
with open(filepath, 'r') as f:
content = f.read()
# Parse YAML
try:
parsed = yaml.safe_load(content)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML syntax: {str(e)}")
return {
'filename': filename,
'content': content,
'parsed': parsed
}
def create_from_yaml(self, filename: str, content: str) -> str:
"""
Create config from YAML content.
Args:
filename: Desired filename (will be sanitized)
content: YAML content string
Returns:
Final filename (sanitized)
Raises:
ValueError: If content invalid or filename conflict
"""
# Sanitize filename
filename = secure_filename(filename)
# Ensure .yaml extension
if not filename.endswith(('.yaml', '.yml')):
filename += '.yaml'
filepath = os.path.join(self.configs_dir, filename)
# Check for conflicts
if os.path.exists(filepath):
raise ValueError(f"Config file '{filename}' already exists")
# Parse and validate YAML
try:
parsed = yaml.safe_load(content)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML syntax: {str(e)}")
# Validate config structure
is_valid, error_msg = self.validate_config_content(parsed)
if not is_valid:
raise ValueError(f"Invalid config structure: {error_msg}")
# Create inline sites in database (if any)
self.create_inline_sites(parsed)
# Write file
with open(filepath, 'w') as f:
f.write(content)
return filename
def create_from_cidr(
self,
title: str,
cidr: str,
site_name: Optional[str] = None,
ping_default: bool = False
) -> Tuple[str, str]:
"""
Create config from CIDR range.
Args:
title: Scan configuration title
cidr: CIDR range (e.g., "10.0.0.0/24")
site_name: Optional site name (defaults to "Site 1")
ping_default: Default ping expectation for all IPs
Returns:
Tuple of (final_filename, yaml_content)
Raises:
ValueError: If CIDR invalid or other validation errors
"""
# Validate and parse CIDR
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError as e:
raise ValueError(f"Invalid CIDR range: {str(e)}")
# Check if network is too large (prevent expansion of huge ranges)
if network.num_addresses > 10000:
raise ValueError(f"CIDR range too large: {network.num_addresses} addresses. Maximum is 10,000.")
# Expand CIDR to list of IP addresses
ip_list = [str(ip) for ip in network.hosts()]
# If network has only 1 address (like /32 or /128), hosts() returns empty
# In that case, use the network address itself
if not ip_list:
ip_list = [str(network.network_address)]
# Build site name
if not site_name or not site_name.strip():
site_name = "Site 1"
# Build IP configurations
ips = []
for ip_address in ip_list:
ips.append({
'address': ip_address,
'expected': {
'ping': ping_default,
'tcp_ports': [],
'udp_ports': []
}
})
# Build YAML structure
config_data = {
'title': title.strip(),
'sites': [
{
'name': site_name.strip(),
'ips': ips
}
]
}
# Convert to YAML string
yaml_content = yaml.dump(config_data, sort_keys=False, default_flow_style=False)
# Generate filename from title
filename = self.generate_filename_from_title(title)
filepath = os.path.join(self.configs_dir, filename)
# Check for conflicts
if os.path.exists(filepath):
raise ValueError(f"Config file '{filename}' already exists")
# Write file
with open(filepath, 'w') as f:
f.write(yaml_content)
return filename, yaml_content
def update_config_file(self, filename: str, yaml_content: str) -> None:
"""
[DEPRECATED] Update existing config file with new YAML content.
Args:
filename: Config filename to update
yaml_content: New YAML content string
Raises:
FileNotFoundError: If config doesn't exist
ValueError: If YAML content is invalid
"""
filepath = os.path.join(self.configs_dir, filename)
# Check if file exists
if not os.path.exists(filepath):
raise FileNotFoundError(f"Config file '{filename}' not found")
# Parse and validate YAML
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML syntax: {str(e)}")
# Validate config structure
is_valid, error_msg = self.validate_config_content(parsed)
if not is_valid:
raise ValueError(f"Invalid config structure: {error_msg}")
# Write updated content
with open(filepath, 'w') as f:
f.write(yaml_content)
def delete_config_file(self, filename: str) -> None:
"""
[DEPRECATED] Delete config file and cascade delete any associated schedules.
When a config is deleted, all schedules using that config (both enabled
and disabled) are automatically deleted as well, since they would be
invalid without the config file.
Args:
filename: Config filename to delete
Raises:
FileNotFoundError: If config doesn't exist
"""
filepath = os.path.join(self.configs_dir, filename)
if not os.path.exists(filepath):
raise FileNotFoundError(f"Config file '{filename}' not found")
# Delete any schedules using this config (both enabled and disabled)
try:
from web.services.schedule_service import ScheduleService
from flask import current_app
# Get database session from Flask app
db = current_app.db_session
# Get all schedules
schedule_service = ScheduleService(db)
result = schedule_service.list_schedules(page=1, per_page=10000)
schedules = result.get('schedules', [])
# Build full path for comparison
config_path = os.path.join(self.configs_dir, filename)
# Note: This function is deprecated. Schedules now use config_id.
# This code path should not be reached for new configs.
deleted_schedules = []
import logging
logging.getLogger(__name__).warning(
f"delete_config_file called for '{filename}' - this is deprecated. Use database configs with config_id instead."
)
if deleted_schedules:
import logging
logging.getLogger(__name__).info(
f"Cascade deleted {len(deleted_schedules)} schedule(s) associated with config '{filename}': {', '.join(deleted_schedules)}"
)
except ImportError:
# If ScheduleService doesn't exist yet, skip schedule deletion
pass
except Exception as e:
# Log error but continue with config deletion
import logging
logging.getLogger(__name__).error(
f"Error deleting schedules for config {filename}: {e}", exc_info=True
)
# Delete file
os.remove(filepath)
def validate_config_content(self, content: Dict, check_site_refs: bool = True) -> Tuple[bool, str]:
"""
Validate parsed YAML config structure.
Supports both legacy format (inline IPs) and new format (site references or CIDRs).
Args:
content: Parsed YAML config as dict
check_site_refs: If True, validates that referenced sites exist in database
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(content, dict):
return False, "Config must be a dictionary/object"
# Check required fields
if 'title' not in content:
return False, "Missing required field: 'title'"
if 'sites' not in content:
return False, "Missing required field: 'sites'"
# Validate title
if not isinstance(content['title'], str) or not content['title'].strip():
return False, "Field 'title' must be a non-empty string"
# Validate sites
sites = content['sites']
if not isinstance(sites, list):
return False, "Field 'sites' must be a list"
if len(sites) == 0:
return False, "Must have at least one site defined"
# Validate each site
for i, site in enumerate(sites):
if not isinstance(site, dict):
return False, f"Site {i+1} must be a dictionary/object"
# Check if this is a site reference (new format)
if 'site_ref' in site:
# Site reference format
site_ref = site.get('site_ref')
if not isinstance(site_ref, str) or not site_ref.strip():
return False, f"Site {i+1} field 'site_ref' must be a non-empty string"
# Validate site reference exists (if check enabled)
if check_site_refs:
try:
from web.services.site_service import SiteService
from flask import current_app
site_service = SiteService(current_app.db_session)
referenced_site = site_service.get_site_by_name(site_ref)
if not referenced_site:
return False, f"Site {i+1}: Referenced site '{site_ref}' does not exist"
except Exception as e:
# If we can't check (e.g., outside app context), skip validation
pass
continue # Site reference is valid
# Check if this is inline site creation with CIDRs (new format)
if 'cidrs' in site:
# Inline site creation with CIDR format
if 'name' not in site:
return False, f"Site {i+1} with inline CIDRs missing required field: 'name'"
cidrs = site.get('cidrs')
if not isinstance(cidrs, list):
return False, f"Site {i+1} field 'cidrs' must be a list"
if len(cidrs) == 0:
return False, f"Site {i+1} must have at least one CIDR"
# Validate each CIDR
for j, cidr_config in enumerate(cidrs):
if not isinstance(cidr_config, dict):
return False, f"Site {i+1} CIDR {j+1} must be a dictionary/object"
if 'cidr' not in cidr_config:
return False, f"Site {i+1} CIDR {j+1} missing required field: 'cidr'"
# Validate CIDR format
cidr_str = cidr_config.get('cidr')
try:
ipaddress.ip_network(cidr_str, strict=False)
except ValueError:
return False, f"Site {i+1} CIDR {j+1}: Invalid CIDR notation '{cidr_str}'"
continue # Inline CIDR site is valid
# Legacy format: inline IPs
if 'name' not in site:
return False, f"Site {i+1} missing required field: 'name'"
if 'ips' not in site:
return False, f"Site {i+1} missing required field: 'ips' (or use 'site_ref' or 'cidrs')"
if not isinstance(site['ips'], list):
return False, f"Site {i+1} field 'ips' must be a list"
if len(site['ips']) == 0:
return False, f"Site {i+1} must have at least one IP"
# Validate each IP
for j, ip_config in enumerate(site['ips']):
if not isinstance(ip_config, dict):
return False, f"Site {i+1} IP {j+1} must be a dictionary/object"
if 'address' not in ip_config:
return False, f"Site {i+1} IP {j+1} missing required field: 'address'"
if 'expected' not in ip_config:
return False, f"Site {i+1} IP {j+1} missing required field: 'expected'"
if not isinstance(ip_config['expected'], dict):
return False, f"Site {i+1} IP {j+1} field 'expected' must be a dictionary/object"
return True, ""
def get_schedules_using_config(self, filename: str) -> List[str]:
"""
Get list of schedule names using this config.
Args:
filename: Config filename
Returns:
List of schedule names (e.g., ["Daily Scan", "Weekly Audit"])
"""
# Import here to avoid circular dependency
try:
from web.services.schedule_service import ScheduleService
from flask import current_app
# Get database session from Flask app
db = current_app.db_session
# Get all schedules (use large per_page to get all)
schedule_service = ScheduleService(db)
result = schedule_service.list_schedules(page=1, per_page=10000)
# Extract schedules list from paginated result
schedules = result.get('schedules', [])
# Build full path for comparison
config_path = os.path.join(self.configs_dir, filename)
# Note: This function is deprecated. Schedules now use config_id.
# Return empty list as schedules no longer use config_file.
return []
except ImportError:
# If ScheduleService doesn't exist yet, return empty list
return []
except Exception as e:
# If any error occurs, return empty list (safer than failing)
# Log the error for debugging
import logging
logging.getLogger(__name__).error(f"Error getting schedules using config {filename}: {e}", exc_info=True)
return []
def generate_filename_from_title(self, title: str) -> str:
"""
Generate safe filename from scan title.
Args:
title: Scan title string
Returns:
Safe filename (e.g., "Prod Scan 2025" -> "prod-scan-2025.yaml")
"""
# Convert to lowercase
filename = title.lower()
# Replace spaces with hyphens
filename = filename.replace(' ', '-')
# Remove special characters (keep only alphanumeric, hyphens, underscores)
filename = re.sub(r'[^a-z0-9\-_]', '', filename)
# Remove consecutive hyphens
filename = re.sub(r'-+', '-', filename)
# Remove leading/trailing hyphens
filename = filename.strip('-')
# Limit length (max 200 chars, reserve 5 for .yaml)
max_length = 195
if len(filename) > max_length:
filename = filename[:max_length]
# Ensure not empty
if not filename:
filename = 'config'
# Add .yaml extension
filename += '.yaml'
return filename
def get_config_path(self, filename: str) -> str:
"""
Get absolute path for a config file.
Args:
filename: Config filename
Returns:
Absolute path to config file
"""
return os.path.join(self.configs_dir, filename)
def config_exists(self, filename: str) -> bool:
"""
Check if a config file exists.
Args:
filename: Config filename
Returns:
True if file exists, False otherwise
"""
filepath = os.path.join(self.configs_dir, filename)
return os.path.exists(filepath) and os.path.isfile(filepath)
def create_inline_sites(self, config_content: Dict) -> None:
"""
Create sites in the database for inline site definitions in a config.
This method scans the config for inline site definitions (with CIDRs)
and creates them as reusable sites in the database if they don't already exist.
Args:
config_content: Parsed YAML config dictionary
Raises:
ValueError: If site creation fails
"""
try:
from web.services.site_service import SiteService
from flask import current_app
site_service = SiteService(current_app.db_session)
sites = config_content.get('sites', [])
for site_def in sites:
# Skip site references (they already exist)
if 'site_ref' in site_def:
continue
# Skip legacy IP-based sites (not creating those as reusable sites)
if 'ips' in site_def and 'cidrs' not in site_def:
continue
# Process inline CIDR-based sites
if 'cidrs' in site_def:
site_name = site_def.get('name')
# Check if site already exists
existing_site = site_service.get_site_by_name(site_name)
if existing_site:
# Site already exists, skip creation
continue
# Create new site
cidrs = site_def.get('cidrs', [])
description = f"Auto-created from config '{config_content.get('title', 'Unknown')}'"
site_service.create_site(
name=site_name,
description=description,
cidrs=cidrs
)
except Exception as e:
# If site creation fails, log but don't block config creation
import logging
logging.getLogger(__name__).warning(
f"Failed to create inline sites from config: {str(e)}"
)