Files
SneakyScan/app/web/api/sites.py
Phillip Tarrant 0ec338e252 Migrate from file-based configs to database with per-IP site configuration
Major architectural changes:
   - Replace YAML config files with database-stored ScanConfig model
   - Remove CIDR block support in favor of individual IP addresses per site
   - Each IP now has its own expected_ping, expected_tcp_ports, expected_udp_ports
   - AlertRule now uses config_id FK instead of config_file string

   API changes:
   - POST /api/scans now requires config_id instead of config_file
   - Alert rules API uses config_id with validation
   - All config dropdowns fetch from /api/configs dynamically

   Template updates:
   - scans.html, dashboard.html, alert_rules.html load configs via API
   - Display format: Config Title (X sites) in dropdowns
   - Removed Jinja2 config_files loops

   Migrations:
   - 008: Expand CIDRs to individual IPs with per-IP port configs
   - 009: Remove CIDR-related columns
   - 010: Add config_id to alert_rules, remove config_file
2025-11-19 19:40:34 -06:00

656 lines
21 KiB
Python

"""
Sites API blueprint.
Handles endpoints for managing reusable site definitions, including CIDR ranges
and IP-level overrides.
"""
import logging
from flask import Blueprint, current_app, jsonify, request
from sqlalchemy.exc import SQLAlchemyError
from web.auth.decorators import api_auth_required
from web.services.site_service import SiteService
from web.utils.pagination import validate_page_params
bp = Blueprint('sites', __name__)
logger = logging.getLogger(__name__)
@bp.route('', methods=['GET'])
@api_auth_required
def list_sites():
"""
List all sites with pagination.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 20, max: 100)
all: If 'true', returns all sites without pagination (for dropdowns)
Returns:
JSON response with sites list and pagination info
"""
try:
# Check if requesting all sites (no pagination)
if request.args.get('all', '').lower() == 'true':
site_service = SiteService(current_app.db_session)
sites = site_service.list_all_sites()
logger.info(f"Listed all sites (count={len(sites)})")
return jsonify({'sites': sites})
# Get and validate query parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# Validate pagination params
page, per_page = validate_page_params(page, per_page)
# Get sites from service
site_service = SiteService(current_app.db_session)
paginated_result = site_service.list_sites(page=page, per_page=per_page)
logger.info(f"Listed sites: page={page}, per_page={per_page}, total={paginated_result.total}")
return jsonify({
'sites': paginated_result.items,
'total': paginated_result.total,
'page': paginated_result.page,
'per_page': paginated_result.per_page,
'total_pages': paginated_result.pages,
'has_prev': paginated_result.has_prev,
'has_next': paginated_result.has_next
})
except ValueError as e:
logger.warning(f"Invalid request parameters: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error listing sites: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve sites'
}), 500
except Exception as e:
logger.error(f"Unexpected error listing sites: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>', methods=['GET'])
@api_auth_required
def get_site(site_id):
"""
Get details for a specific site.
Args:
site_id: Site ID
Returns:
JSON response with site details including CIDRs and IP overrides
"""
try:
site_service = SiteService(current_app.db_session)
site = site_service.get_site(site_id)
if not site:
logger.warning(f"Site not found: {site_id}")
return jsonify({
'error': 'Not found',
'message': f'Site with ID {site_id} not found'
}), 404
logger.info(f"Retrieved site details: {site_id}")
return jsonify(site)
except SQLAlchemyError as e:
logger.error(f"Database error retrieving site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve site'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('', methods=['POST'])
@api_auth_required
def create_site():
"""
Create a new site.
Request body:
name: Site name (required, must be unique)
description: Site description (optional)
cidrs: List of CIDR definitions (optional, but recommended)
[
{
"cidr": "10.0.0.0/24",
"expected_ping": true,
"expected_tcp_ports": [22, 80, 443],
"expected_udp_ports": [53]
}
]
Returns:
JSON response with created site data
"""
try:
data = request.get_json() or {}
# Validate required fields
name = data.get('name')
if not name:
logger.warning("Site creation request missing name")
return jsonify({
'error': 'Invalid request',
'message': 'name is required'
}), 400
description = data.get('description')
# Create site (empty initially)
site_service = SiteService(current_app.db_session)
site = site_service.create_site(
name=name,
description=description
)
logger.info(f"Created site '{name}' (id={site['id']})")
return jsonify(site), 201
except ValueError as e:
logger.warning(f"Invalid site creation request: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error creating site: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to create site'
}), 500
except Exception as e:
logger.error(f"Unexpected error creating site: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>', methods=['PUT'])
@api_auth_required
def update_site(site_id):
"""
Update site metadata (name and/or description).
Args:
site_id: Site ID
Request body:
name: New site name (optional, must be unique)
description: New description (optional)
Returns:
JSON response with updated site data
"""
try:
data = request.get_json() or {}
name = data.get('name')
description = data.get('description')
# Update site
site_service = SiteService(current_app.db_session)
site = site_service.update_site(
site_id=site_id,
name=name,
description=description
)
logger.info(f"Updated site {site_id}")
return jsonify(site)
except ValueError as e:
logger.warning(f"Invalid site update request: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error updating site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to update site'
}), 500
except Exception as e:
logger.error(f"Unexpected error updating site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>', methods=['DELETE'])
@api_auth_required
def delete_site(site_id):
"""
Delete a site.
Prevents deletion if site is used in any scan.
Args:
site_id: Site ID
Returns:
JSON response with success message
"""
try:
site_service = SiteService(current_app.db_session)
site_service.delete_site(site_id)
logger.info(f"Deleted site {site_id}")
return jsonify({
'message': f'Site {site_id} deleted successfully'
})
except ValueError as e:
logger.warning(f"Cannot delete site {site_id}: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error deleting site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to delete site'
}), 500
except Exception as e:
logger.error(f"Unexpected error deleting site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/ips/bulk', methods=['POST'])
@api_auth_required
def bulk_add_ips(site_id):
"""
Bulk add IPs to a site from CIDR or list.
Args:
site_id: Site ID
Request body:
source_type: "cidr" or "list" (required)
cidr: CIDR notation if source_type="cidr" (e.g., "10.0.0.0/24")
ips: List of IP addresses if source_type="list" (e.g., ["10.0.0.1", "10.0.0.2"])
expected_ping: Expected ping response for all IPs (optional)
expected_tcp_ports: List of expected TCP ports for all IPs (optional)
expected_udp_ports: List of expected UDP ports for all IPs (optional)
Returns:
JSON response with count of IPs added and any errors
"""
try:
data = request.get_json() or {}
source_type = data.get('source_type')
if source_type not in ['cidr', 'list']:
return jsonify({
'error': 'Invalid request',
'message': 'source_type must be "cidr" or "list"'
}), 400
expected_ping = data.get('expected_ping')
expected_tcp_ports = data.get('expected_tcp_ports', [])
expected_udp_ports = data.get('expected_udp_ports', [])
site_service = SiteService(current_app.db_session)
if source_type == 'cidr':
cidr = data.get('cidr')
if not cidr:
return jsonify({
'error': 'Invalid request',
'message': 'cidr is required when source_type="cidr"'
}), 400
result = site_service.bulk_add_ips_from_cidr(
site_id=site_id,
cidr=cidr,
expected_ping=expected_ping,
expected_tcp_ports=expected_tcp_ports,
expected_udp_ports=expected_udp_ports
)
logger.info(f"Bulk added {result['ip_count']} IPs from CIDR '{cidr}' to site {site_id}")
return jsonify(result), 201
else: # source_type == 'list'
ip_list = data.get('ips', [])
if not isinstance(ip_list, list):
return jsonify({
'error': 'Invalid request',
'message': 'ips must be a list when source_type="list"'
}), 400
result = site_service.bulk_add_ips_from_list(
site_id=site_id,
ip_list=ip_list,
expected_ping=expected_ping,
expected_tcp_ports=expected_tcp_ports,
expected_udp_ports=expected_udp_ports
)
logger.info(f"Bulk added {result['ip_count']} IPs from list to site {site_id}")
return jsonify(result), 201
except ValueError as e:
logger.warning(f"Invalid bulk IP request: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error bulk adding IPs to site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to add IPs'
}), 500
except Exception as e:
logger.error(f"Unexpected error bulk adding IPs to site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/ips', methods=['GET'])
@api_auth_required
def list_ips(site_id):
"""
List IPs in a site with pagination.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 50, max: 200)
Returns:
JSON response with IPs list and pagination info
"""
try:
# Get and validate query parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
# Validate pagination params
page, per_page = validate_page_params(page, per_page, max_per_page=200)
# Get IPs from service
site_service = SiteService(current_app.db_session)
paginated_result = site_service.list_ips(
site_id=site_id,
page=page,
per_page=per_page
)
logger.info(f"Listed IPs for site {site_id}: page={page}, per_page={per_page}, total={paginated_result.total}")
return jsonify({
'ips': paginated_result.items,
'total': paginated_result.total,
'page': paginated_result.page,
'per_page': paginated_result.per_page,
'total_pages': paginated_result.pages,
'has_prev': paginated_result.has_prev,
'has_next': paginated_result.has_next
})
except ValueError as e:
logger.warning(f"Invalid request parameters: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error listing IPs for site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve IPs'
}), 500
except Exception as e:
logger.error(f"Unexpected error listing IPs for site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/ips', methods=['POST'])
@api_auth_required
def add_standalone_ip(site_id):
"""
Add a standalone IP (without CIDR parent) to a site.
Args:
site_id: Site ID
Request body:
ip_address: IP address (required)
expected_ping: Expected ping response (optional)
expected_tcp_ports: List of expected TCP ports (optional)
expected_udp_ports: List of expected UDP ports (optional)
Returns:
JSON response with created IP data
"""
try:
data = request.get_json() or {}
# Validate required fields
ip_address = data.get('ip_address')
if not ip_address:
logger.warning("Standalone IP creation request missing ip_address")
return jsonify({
'error': 'Invalid request',
'message': 'ip_address is required'
}), 400
expected_ping = data.get('expected_ping')
expected_tcp_ports = data.get('expected_tcp_ports', [])
expected_udp_ports = data.get('expected_udp_ports', [])
# Add standalone IP
site_service = SiteService(current_app.db_session)
ip_data = site_service.add_standalone_ip(
site_id=site_id,
ip_address=ip_address,
expected_ping=expected_ping,
expected_tcp_ports=expected_tcp_ports,
expected_udp_ports=expected_udp_ports
)
logger.info(f"Added standalone IP '{ip_address}' to site {site_id}")
return jsonify(ip_data), 201
except ValueError as e:
logger.warning(f"Invalid standalone IP creation request: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error adding standalone IP to site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to add IP'
}), 500
except Exception as e:
logger.error(f"Unexpected error adding standalone IP to site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/ips/<int:ip_id>', methods=['PUT'])
@api_auth_required
def update_ip_settings(site_id, ip_id):
"""
Update settings for an individual IP.
Args:
site_id: Site ID
ip_id: IP ID
Request body:
expected_ping: New ping expectation (optional)
expected_tcp_ports: New TCP ports expectation (optional)
expected_udp_ports: New UDP ports expectation (optional)
Returns:
JSON response with updated IP data
"""
try:
data = request.get_json() or {}
expected_ping = data.get('expected_ping')
expected_tcp_ports = data.get('expected_tcp_ports')
expected_udp_ports = data.get('expected_udp_ports')
# Update IP settings
site_service = SiteService(current_app.db_session)
ip_data = site_service.update_ip_settings(
site_id=site_id,
ip_id=ip_id,
expected_ping=expected_ping,
expected_tcp_ports=expected_tcp_ports,
expected_udp_ports=expected_udp_ports
)
logger.info(f"Updated IP {ip_id} in site {site_id}")
return jsonify(ip_data)
except ValueError as e:
logger.warning(f"Invalid IP update request: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error updating IP {ip_id} in site {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to update IP'
}), 500
except Exception as e:
logger.error(f"Unexpected error updating IP {ip_id} in site {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/ips/<int:ip_id>', methods=['DELETE'])
@api_auth_required
def remove_ip(site_id, ip_id):
"""
Remove an IP from a site.
Args:
site_id: Site ID
ip_id: IP ID
Returns:
JSON response with success message
"""
try:
site_service = SiteService(current_app.db_session)
site_service.remove_ip(site_id, ip_id)
logger.info(f"Removed IP {ip_id} from site {site_id}")
return jsonify({
'message': f'IP {ip_id} removed successfully'
})
except ValueError as e:
logger.warning(f"Cannot remove IP {ip_id}: {str(e)}")
return jsonify({
'error': 'Invalid request',
'message': str(e)
}), 400
except SQLAlchemyError as e:
logger.error(f"Database error removing IP {ip_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to remove IP'
}), 500
except Exception as e:
logger.error(f"Unexpected error removing IP {ip_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
@bp.route('/<int:site_id>/usage', methods=['GET'])
@api_auth_required
def get_site_usage(site_id):
"""
Get list of scans that use this site.
Args:
site_id: Site ID
Returns:
JSON response with list of scans
"""
try:
site_service = SiteService(current_app.db_session)
# First check if site exists
site = site_service.get_site(site_id)
if not site:
logger.warning(f"Site not found: {site_id}")
return jsonify({
'error': 'Not found',
'message': f'Site with ID {site_id} not found'
}), 404
scans = site_service.get_scan_usage(site_id)
logger.info(f"Retrieved usage for site {site_id} (count={len(scans)})")
return jsonify({
'site_id': site_id,
'site_name': site['name'],
'scans': scans,
'count': len(scans)
})
except SQLAlchemyError as e:
logger.error(f"Database error retrieving site usage {site_id}: {str(e)}")
return jsonify({
'error': 'Database error',
'message': 'Failed to retrieve site usage'
}), 500
except Exception as e:
logger.error(f"Unexpected error retrieving site usage {site_id}: {str(e)}", exc_info=True)
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500