code cleanup, UI change to menu to make it cleaner
This commit is contained in:
@@ -6,13 +6,8 @@ both database-stored (primary) and file-based (deprecated).
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import yaml
|
||||
import ipaddress
|
||||
from typing import Dict, List, Tuple, Any, Optional
|
||||
from typing import Dict, List, Any, Optional
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from werkzeug.utils import secure_filename
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
@@ -342,626 +337,3 @@ class ConfigService:
|
||||
self.db.commit()
|
||||
|
||||
return self.get_config_by_id(config_id)
|
||||
|
||||
# ============================================================================
|
||||
# Legacy YAML File Operations (Deprecated)
|
||||
# ============================================================================
|
||||
|
||||
def list_configs_file(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
[DEPRECATED] List all config files with metadata.
|
||||
|
||||
Returns:
|
||||
List of config metadata dictionaries:
|
||||
[
|
||||
{
|
||||
"filename": "prod-scan.yaml",
|
||||
"title": "Prod Scan",
|
||||
"path": "/app/configs/prod-scan.yaml",
|
||||
"created_at": "2025-11-15T10:30:00Z",
|
||||
"size_bytes": 1234,
|
||||
"used_by_schedules": ["Daily Scan", "Weekly Audit"]
|
||||
}
|
||||
]
|
||||
"""
|
||||
configs = []
|
||||
|
||||
# Get all YAML files in configs directory
|
||||
if not os.path.exists(self.configs_dir):
|
||||
return configs
|
||||
|
||||
for filename in os.listdir(self.configs_dir):
|
||||
if not filename.endswith(('.yaml', '.yml')):
|
||||
continue
|
||||
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
if not os.path.isfile(filepath):
|
||||
continue
|
||||
|
||||
try:
|
||||
# Get file metadata
|
||||
stat_info = os.stat(filepath)
|
||||
created_at = datetime.fromtimestamp(stat_info.st_mtime).isoformat() + 'Z'
|
||||
size_bytes = stat_info.st_size
|
||||
|
||||
# Parse YAML to get title
|
||||
title = None
|
||||
try:
|
||||
with open(filepath, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
if isinstance(data, dict):
|
||||
title = data.get('title', filename)
|
||||
except Exception:
|
||||
title = filename # Fallback to filename if parsing fails
|
||||
|
||||
# Get schedules using this config
|
||||
used_by_schedules = self.get_schedules_using_config(filename)
|
||||
|
||||
configs.append({
|
||||
'filename': filename,
|
||||
'title': title,
|
||||
'path': filepath,
|
||||
'created_at': created_at,
|
||||
'size_bytes': size_bytes,
|
||||
'used_by_schedules': used_by_schedules
|
||||
})
|
||||
except Exception as e:
|
||||
# Skip files that can't be read
|
||||
continue
|
||||
|
||||
# Sort by created_at (most recent first)
|
||||
configs.sort(key=lambda x: x['created_at'], reverse=True)
|
||||
|
||||
return configs
|
||||
|
||||
def get_config(self, filename: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get config file content and parsed data.
|
||||
|
||||
Args:
|
||||
filename: Config filename
|
||||
|
||||
Returns:
|
||||
{
|
||||
"filename": "prod-scan.yaml",
|
||||
"content": "title: Prod Scan\n...",
|
||||
"parsed": {"title": "Prod Scan", "sites": [...]}
|
||||
}
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config doesn't exist
|
||||
ValueError: If config content is invalid
|
||||
"""
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError(f"Config file '{filename}' not found")
|
||||
|
||||
# Read file content
|
||||
with open(filepath, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Parse YAML
|
||||
try:
|
||||
parsed = yaml.safe_load(content)
|
||||
except yaml.YAMLError as e:
|
||||
raise ValueError(f"Invalid YAML syntax: {str(e)}")
|
||||
|
||||
return {
|
||||
'filename': filename,
|
||||
'content': content,
|
||||
'parsed': parsed
|
||||
}
|
||||
|
||||
def create_from_yaml(self, filename: str, content: str) -> str:
|
||||
"""
|
||||
Create config from YAML content.
|
||||
|
||||
Args:
|
||||
filename: Desired filename (will be sanitized)
|
||||
content: YAML content string
|
||||
|
||||
Returns:
|
||||
Final filename (sanitized)
|
||||
|
||||
Raises:
|
||||
ValueError: If content invalid or filename conflict
|
||||
"""
|
||||
# Sanitize filename
|
||||
filename = secure_filename(filename)
|
||||
|
||||
# Ensure .yaml extension
|
||||
if not filename.endswith(('.yaml', '.yml')):
|
||||
filename += '.yaml'
|
||||
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
# Check for conflicts
|
||||
if os.path.exists(filepath):
|
||||
raise ValueError(f"Config file '{filename}' already exists")
|
||||
|
||||
# Parse and validate YAML
|
||||
try:
|
||||
parsed = yaml.safe_load(content)
|
||||
except yaml.YAMLError as e:
|
||||
raise ValueError(f"Invalid YAML syntax: {str(e)}")
|
||||
|
||||
# Validate config structure
|
||||
is_valid, error_msg = self.validate_config_content(parsed)
|
||||
if not is_valid:
|
||||
raise ValueError(f"Invalid config structure: {error_msg}")
|
||||
|
||||
# Create inline sites in database (if any)
|
||||
self.create_inline_sites(parsed)
|
||||
|
||||
# Write file
|
||||
with open(filepath, 'w') as f:
|
||||
f.write(content)
|
||||
|
||||
return filename
|
||||
|
||||
def create_from_cidr(
|
||||
self,
|
||||
title: str,
|
||||
cidr: str,
|
||||
site_name: Optional[str] = None,
|
||||
ping_default: bool = False
|
||||
) -> Tuple[str, str]:
|
||||
"""
|
||||
Create config from CIDR range.
|
||||
|
||||
Args:
|
||||
title: Scan configuration title
|
||||
cidr: CIDR range (e.g., "10.0.0.0/24")
|
||||
site_name: Optional site name (defaults to "Site 1")
|
||||
ping_default: Default ping expectation for all IPs
|
||||
|
||||
Returns:
|
||||
Tuple of (final_filename, yaml_content)
|
||||
|
||||
Raises:
|
||||
ValueError: If CIDR invalid or other validation errors
|
||||
"""
|
||||
# Validate and parse CIDR
|
||||
try:
|
||||
network = ipaddress.ip_network(cidr, strict=False)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid CIDR range: {str(e)}")
|
||||
|
||||
# Check if network is too large (prevent expansion of huge ranges)
|
||||
if network.num_addresses > 10000:
|
||||
raise ValueError(f"CIDR range too large: {network.num_addresses} addresses. Maximum is 10,000.")
|
||||
|
||||
# Expand CIDR to list of IP addresses
|
||||
ip_list = [str(ip) for ip in network.hosts()]
|
||||
|
||||
# If network has only 1 address (like /32 or /128), hosts() returns empty
|
||||
# In that case, use the network address itself
|
||||
if not ip_list:
|
||||
ip_list = [str(network.network_address)]
|
||||
|
||||
# Build site name
|
||||
if not site_name or not site_name.strip():
|
||||
site_name = "Site 1"
|
||||
|
||||
# Build IP configurations
|
||||
ips = []
|
||||
for ip_address in ip_list:
|
||||
ips.append({
|
||||
'address': ip_address,
|
||||
'expected': {
|
||||
'ping': ping_default,
|
||||
'tcp_ports': [],
|
||||
'udp_ports': []
|
||||
}
|
||||
})
|
||||
|
||||
# Build YAML structure
|
||||
config_data = {
|
||||
'title': title.strip(),
|
||||
'sites': [
|
||||
{
|
||||
'name': site_name.strip(),
|
||||
'ips': ips
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Convert to YAML string
|
||||
yaml_content = yaml.dump(config_data, sort_keys=False, default_flow_style=False)
|
||||
|
||||
# Generate filename from title
|
||||
filename = self.generate_filename_from_title(title)
|
||||
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
# Check for conflicts
|
||||
if os.path.exists(filepath):
|
||||
raise ValueError(f"Config file '{filename}' already exists")
|
||||
|
||||
# Write file
|
||||
with open(filepath, 'w') as f:
|
||||
f.write(yaml_content)
|
||||
|
||||
return filename, yaml_content
|
||||
|
||||
def update_config_file(self, filename: str, yaml_content: str) -> None:
|
||||
"""
|
||||
[DEPRECATED] Update existing config file with new YAML content.
|
||||
|
||||
Args:
|
||||
filename: Config filename to update
|
||||
yaml_content: New YAML content string
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config doesn't exist
|
||||
ValueError: If YAML content is invalid
|
||||
"""
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
# Check if file exists
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError(f"Config file '{filename}' not found")
|
||||
|
||||
# Parse and validate YAML
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError as e:
|
||||
raise ValueError(f"Invalid YAML syntax: {str(e)}")
|
||||
|
||||
# Validate config structure
|
||||
is_valid, error_msg = self.validate_config_content(parsed)
|
||||
if not is_valid:
|
||||
raise ValueError(f"Invalid config structure: {error_msg}")
|
||||
|
||||
# Write updated content
|
||||
with open(filepath, 'w') as f:
|
||||
f.write(yaml_content)
|
||||
|
||||
def delete_config_file(self, filename: str) -> None:
|
||||
"""
|
||||
[DEPRECATED] Delete config file and cascade delete any associated schedules.
|
||||
|
||||
When a config is deleted, all schedules using that config (both enabled
|
||||
and disabled) are automatically deleted as well, since they would be
|
||||
invalid without the config file.
|
||||
|
||||
Args:
|
||||
filename: Config filename to delete
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config doesn't exist
|
||||
"""
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError(f"Config file '{filename}' not found")
|
||||
|
||||
# Delete any schedules using this config (both enabled and disabled)
|
||||
try:
|
||||
from web.services.schedule_service import ScheduleService
|
||||
from flask import current_app
|
||||
|
||||
# Get database session from Flask app
|
||||
db = current_app.db_session
|
||||
|
||||
# Get all schedules
|
||||
schedule_service = ScheduleService(db)
|
||||
result = schedule_service.list_schedules(page=1, per_page=10000)
|
||||
schedules = result.get('schedules', [])
|
||||
|
||||
# Build full path for comparison
|
||||
config_path = os.path.join(self.configs_dir, filename)
|
||||
|
||||
# Note: This function is deprecated. Schedules now use config_id.
|
||||
# This code path should not be reached for new configs.
|
||||
deleted_schedules = []
|
||||
import logging
|
||||
logging.getLogger(__name__).warning(
|
||||
f"delete_config_file called for '{filename}' - this is deprecated. Use database configs with config_id instead."
|
||||
)
|
||||
|
||||
if deleted_schedules:
|
||||
import logging
|
||||
logging.getLogger(__name__).info(
|
||||
f"Cascade deleted {len(deleted_schedules)} schedule(s) associated with config '{filename}': {', '.join(deleted_schedules)}"
|
||||
)
|
||||
|
||||
except ImportError:
|
||||
# If ScheduleService doesn't exist yet, skip schedule deletion
|
||||
pass
|
||||
except Exception as e:
|
||||
# Log error but continue with config deletion
|
||||
import logging
|
||||
logging.getLogger(__name__).error(
|
||||
f"Error deleting schedules for config {filename}: {e}", exc_info=True
|
||||
)
|
||||
|
||||
# Delete file
|
||||
os.remove(filepath)
|
||||
|
||||
def validate_config_content(self, content: Dict, check_site_refs: bool = True) -> Tuple[bool, str]:
|
||||
"""
|
||||
Validate parsed YAML config structure.
|
||||
|
||||
Supports both legacy format (inline IPs) and new format (site references or CIDRs).
|
||||
|
||||
Args:
|
||||
content: Parsed YAML config as dict
|
||||
check_site_refs: If True, validates that referenced sites exist in database
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_message)
|
||||
"""
|
||||
if not isinstance(content, dict):
|
||||
return False, "Config must be a dictionary/object"
|
||||
|
||||
# Check required fields
|
||||
if 'title' not in content:
|
||||
return False, "Missing required field: 'title'"
|
||||
|
||||
if 'sites' not in content:
|
||||
return False, "Missing required field: 'sites'"
|
||||
|
||||
# Validate title
|
||||
if not isinstance(content['title'], str) or not content['title'].strip():
|
||||
return False, "Field 'title' must be a non-empty string"
|
||||
|
||||
# Validate sites
|
||||
sites = content['sites']
|
||||
if not isinstance(sites, list):
|
||||
return False, "Field 'sites' must be a list"
|
||||
|
||||
if len(sites) == 0:
|
||||
return False, "Must have at least one site defined"
|
||||
|
||||
# Validate each site
|
||||
for i, site in enumerate(sites):
|
||||
if not isinstance(site, dict):
|
||||
return False, f"Site {i+1} must be a dictionary/object"
|
||||
|
||||
# Check if this is a site reference (new format)
|
||||
if 'site_ref' in site:
|
||||
# Site reference format
|
||||
site_ref = site.get('site_ref')
|
||||
if not isinstance(site_ref, str) or not site_ref.strip():
|
||||
return False, f"Site {i+1} field 'site_ref' must be a non-empty string"
|
||||
|
||||
# Validate site reference exists (if check enabled)
|
||||
if check_site_refs:
|
||||
try:
|
||||
from web.services.site_service import SiteService
|
||||
from flask import current_app
|
||||
|
||||
site_service = SiteService(current_app.db_session)
|
||||
referenced_site = site_service.get_site_by_name(site_ref)
|
||||
if not referenced_site:
|
||||
return False, f"Site {i+1}: Referenced site '{site_ref}' does not exist"
|
||||
except Exception as e:
|
||||
# If we can't check (e.g., outside app context), skip validation
|
||||
pass
|
||||
|
||||
continue # Site reference is valid
|
||||
|
||||
# Check if this is inline site creation with CIDRs (new format)
|
||||
if 'cidrs' in site:
|
||||
# Inline site creation with CIDR format
|
||||
if 'name' not in site:
|
||||
return False, f"Site {i+1} with inline CIDRs missing required field: 'name'"
|
||||
|
||||
cidrs = site.get('cidrs')
|
||||
if not isinstance(cidrs, list):
|
||||
return False, f"Site {i+1} field 'cidrs' must be a list"
|
||||
|
||||
if len(cidrs) == 0:
|
||||
return False, f"Site {i+1} must have at least one CIDR"
|
||||
|
||||
# Validate each CIDR
|
||||
for j, cidr_config in enumerate(cidrs):
|
||||
if not isinstance(cidr_config, dict):
|
||||
return False, f"Site {i+1} CIDR {j+1} must be a dictionary/object"
|
||||
|
||||
if 'cidr' not in cidr_config:
|
||||
return False, f"Site {i+1} CIDR {j+1} missing required field: 'cidr'"
|
||||
|
||||
# Validate CIDR format
|
||||
cidr_str = cidr_config.get('cidr')
|
||||
try:
|
||||
ipaddress.ip_network(cidr_str, strict=False)
|
||||
except ValueError:
|
||||
return False, f"Site {i+1} CIDR {j+1}: Invalid CIDR notation '{cidr_str}'"
|
||||
|
||||
continue # Inline CIDR site is valid
|
||||
|
||||
# Legacy format: inline IPs
|
||||
if 'name' not in site:
|
||||
return False, f"Site {i+1} missing required field: 'name'"
|
||||
|
||||
if 'ips' not in site:
|
||||
return False, f"Site {i+1} missing required field: 'ips' (or use 'site_ref' or 'cidrs')"
|
||||
|
||||
if not isinstance(site['ips'], list):
|
||||
return False, f"Site {i+1} field 'ips' must be a list"
|
||||
|
||||
if len(site['ips']) == 0:
|
||||
return False, f"Site {i+1} must have at least one IP"
|
||||
|
||||
# Validate each IP
|
||||
for j, ip_config in enumerate(site['ips']):
|
||||
if not isinstance(ip_config, dict):
|
||||
return False, f"Site {i+1} IP {j+1} must be a dictionary/object"
|
||||
|
||||
if 'address' not in ip_config:
|
||||
return False, f"Site {i+1} IP {j+1} missing required field: 'address'"
|
||||
|
||||
if 'expected' not in ip_config:
|
||||
return False, f"Site {i+1} IP {j+1} missing required field: 'expected'"
|
||||
|
||||
if not isinstance(ip_config['expected'], dict):
|
||||
return False, f"Site {i+1} IP {j+1} field 'expected' must be a dictionary/object"
|
||||
|
||||
return True, ""
|
||||
|
||||
def get_schedules_using_config(self, filename: str) -> List[str]:
|
||||
"""
|
||||
Get list of schedule names using this config.
|
||||
|
||||
Args:
|
||||
filename: Config filename
|
||||
|
||||
Returns:
|
||||
List of schedule names (e.g., ["Daily Scan", "Weekly Audit"])
|
||||
"""
|
||||
# Import here to avoid circular dependency
|
||||
try:
|
||||
from web.services.schedule_service import ScheduleService
|
||||
from flask import current_app
|
||||
|
||||
# Get database session from Flask app
|
||||
db = current_app.db_session
|
||||
|
||||
# Get all schedules (use large per_page to get all)
|
||||
schedule_service = ScheduleService(db)
|
||||
result = schedule_service.list_schedules(page=1, per_page=10000)
|
||||
|
||||
# Extract schedules list from paginated result
|
||||
schedules = result.get('schedules', [])
|
||||
|
||||
# Build full path for comparison
|
||||
config_path = os.path.join(self.configs_dir, filename)
|
||||
|
||||
# Note: This function is deprecated. Schedules now use config_id.
|
||||
# Return empty list as schedules no longer use config_file.
|
||||
return []
|
||||
|
||||
except ImportError:
|
||||
# If ScheduleService doesn't exist yet, return empty list
|
||||
return []
|
||||
except Exception as e:
|
||||
# If any error occurs, return empty list (safer than failing)
|
||||
# Log the error for debugging
|
||||
import logging
|
||||
logging.getLogger(__name__).error(f"Error getting schedules using config {filename}: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
def generate_filename_from_title(self, title: str) -> str:
|
||||
"""
|
||||
Generate safe filename from scan title.
|
||||
|
||||
Args:
|
||||
title: Scan title string
|
||||
|
||||
Returns:
|
||||
Safe filename (e.g., "Prod Scan 2025" -> "prod-scan-2025.yaml")
|
||||
"""
|
||||
# Convert to lowercase
|
||||
filename = title.lower()
|
||||
|
||||
# Replace spaces with hyphens
|
||||
filename = filename.replace(' ', '-')
|
||||
|
||||
# Remove special characters (keep only alphanumeric, hyphens, underscores)
|
||||
filename = re.sub(r'[^a-z0-9\-_]', '', filename)
|
||||
|
||||
# Remove consecutive hyphens
|
||||
filename = re.sub(r'-+', '-', filename)
|
||||
|
||||
# Remove leading/trailing hyphens
|
||||
filename = filename.strip('-')
|
||||
|
||||
# Limit length (max 200 chars, reserve 5 for .yaml)
|
||||
max_length = 195
|
||||
if len(filename) > max_length:
|
||||
filename = filename[:max_length]
|
||||
|
||||
# Ensure not empty
|
||||
if not filename:
|
||||
filename = 'config'
|
||||
|
||||
# Add .yaml extension
|
||||
filename += '.yaml'
|
||||
|
||||
return filename
|
||||
|
||||
def get_config_path(self, filename: str) -> str:
|
||||
"""
|
||||
Get absolute path for a config file.
|
||||
|
||||
Args:
|
||||
filename: Config filename
|
||||
|
||||
Returns:
|
||||
Absolute path to config file
|
||||
"""
|
||||
return os.path.join(self.configs_dir, filename)
|
||||
|
||||
def config_exists(self, filename: str) -> bool:
|
||||
"""
|
||||
Check if a config file exists.
|
||||
|
||||
Args:
|
||||
filename: Config filename
|
||||
|
||||
Returns:
|
||||
True if file exists, False otherwise
|
||||
"""
|
||||
filepath = os.path.join(self.configs_dir, filename)
|
||||
return os.path.exists(filepath) and os.path.isfile(filepath)
|
||||
|
||||
def create_inline_sites(self, config_content: Dict) -> None:
|
||||
"""
|
||||
Create sites in the database for inline site definitions in a config.
|
||||
|
||||
This method scans the config for inline site definitions (with CIDRs)
|
||||
and creates them as reusable sites in the database if they don't already exist.
|
||||
|
||||
Args:
|
||||
config_content: Parsed YAML config dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: If site creation fails
|
||||
"""
|
||||
try:
|
||||
from web.services.site_service import SiteService
|
||||
from flask import current_app
|
||||
|
||||
site_service = SiteService(current_app.db_session)
|
||||
|
||||
sites = config_content.get('sites', [])
|
||||
|
||||
for site_def in sites:
|
||||
# Skip site references (they already exist)
|
||||
if 'site_ref' in site_def:
|
||||
continue
|
||||
|
||||
# Skip legacy IP-based sites (not creating those as reusable sites)
|
||||
if 'ips' in site_def and 'cidrs' not in site_def:
|
||||
continue
|
||||
|
||||
# Process inline CIDR-based sites
|
||||
if 'cidrs' in site_def:
|
||||
site_name = site_def.get('name')
|
||||
|
||||
# Check if site already exists
|
||||
existing_site = site_service.get_site_by_name(site_name)
|
||||
if existing_site:
|
||||
# Site already exists, skip creation
|
||||
continue
|
||||
|
||||
# Create new site
|
||||
cidrs = site_def.get('cidrs', [])
|
||||
description = f"Auto-created from config '{config_content.get('title', 'Unknown')}'"
|
||||
|
||||
site_service.create_site(
|
||||
name=site_name,
|
||||
description=description,
|
||||
cidrs=cidrs
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# If site creation fails, log but don't block config creation
|
||||
import logging
|
||||
logging.getLogger(__name__).warning(
|
||||
f"Failed to create inline sites from config: {str(e)}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user