Migrate from file-based configs to database with per-IP site configuration
Major architectural changes: - Replace YAML config files with database-stored ScanConfig model - Remove CIDR block support in favor of individual IP addresses per site - Each IP now has its own expected_ping, expected_tcp_ports, expected_udp_ports - AlertRule now uses config_id FK instead of config_file string API changes: - POST /api/scans now requires config_id instead of config_file - Alert rules API uses config_id with validation - All config dropdowns fetch from /api/configs dynamically Template updates: - scans.html, dashboard.html, alert_rules.html load configs via API - Display format: Config Title (X sites) in dropdowns - Removed Jinja2 config_files loops Migrations: - 008: Expand CIDRs to individual IPs with per-IP port configs - 009: Remove CIDR-related columns - 010: Add config_id to alert_rules, remove config_file
This commit is contained in:
@@ -129,7 +129,7 @@ class ConfigService:
|
||||
'id': site.id,
|
||||
'name': site.name,
|
||||
'description': site.description,
|
||||
'cidr_count': len(site.cidrs)
|
||||
'ip_count': len(site.ips)
|
||||
})
|
||||
|
||||
return {
|
||||
|
||||
@@ -16,7 +16,7 @@ from sqlalchemy.orm import Session, joinedload
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from web.models import (
|
||||
Site, SiteCIDR, SiteIP, ScanSiteAssociation
|
||||
Site, SiteIP, ScanSiteAssociation
|
||||
)
|
||||
from web.utils.pagination import paginate, PaginatedResult
|
||||
|
||||
@@ -40,34 +40,26 @@ class SiteService:
|
||||
"""
|
||||
self.db = db_session
|
||||
|
||||
def create_site(self, name: str, description: Optional[str] = None,
|
||||
cidrs: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
|
||||
def create_site(self, name: str, description: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new site with optional CIDR ranges.
|
||||
Create a new site.
|
||||
|
||||
Args:
|
||||
name: Unique site name
|
||||
description: Optional site description
|
||||
cidrs: List of CIDR definitions with format:
|
||||
[{"cidr": "10.0.0.0/24", "expected_ping": true,
|
||||
"expected_tcp_ports": [22, 80], "expected_udp_ports": [53]}]
|
||||
|
||||
Returns:
|
||||
Dictionary with created site data
|
||||
|
||||
Raises:
|
||||
ValueError: If site name already exists or validation fails
|
||||
ValueError: If site name already exists
|
||||
"""
|
||||
# Validate site name is unique
|
||||
existing = self.db.query(Site).filter(Site.name == name).first()
|
||||
if existing:
|
||||
raise ValueError(f"Site with name '{name}' already exists")
|
||||
|
||||
# Validate we have at least one CIDR if provided
|
||||
if cidrs is not None and len(cidrs) == 0:
|
||||
raise ValueError("Site must have at least one CIDR range")
|
||||
|
||||
# Create site
|
||||
# Create site (can be empty, IPs added separately)
|
||||
site = Site(
|
||||
name=name,
|
||||
description=description,
|
||||
@@ -76,17 +68,10 @@ class SiteService:
|
||||
)
|
||||
|
||||
self.db.add(site)
|
||||
self.db.flush() # Get site.id without committing
|
||||
|
||||
# Add CIDRs if provided
|
||||
if cidrs:
|
||||
for cidr_data in cidrs:
|
||||
self._add_cidr_to_site(site, cidr_data)
|
||||
|
||||
self.db.commit()
|
||||
self.db.refresh(site)
|
||||
|
||||
logger.info(f"Created site '{name}' (id={site.id}) with {len(cidrs or [])} CIDR(s)")
|
||||
logger.info(f"Created site '{name}' (id={site.id})")
|
||||
|
||||
return self._site_to_dict(site)
|
||||
|
||||
@@ -171,7 +156,7 @@ class SiteService:
|
||||
|
||||
def get_site(self, site_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get site details with all CIDRs and IP overrides.
|
||||
Get site details.
|
||||
|
||||
Args:
|
||||
site_id: Site ID to retrieve
|
||||
@@ -181,9 +166,6 @@ class SiteService:
|
||||
"""
|
||||
site = (
|
||||
self.db.query(Site)
|
||||
.options(
|
||||
joinedload(Site.cidrs).joinedload(SiteCIDR.ips)
|
||||
)
|
||||
.filter(Site.id == site_id)
|
||||
.first()
|
||||
)
|
||||
@@ -205,9 +187,6 @@ class SiteService:
|
||||
"""
|
||||
site = (
|
||||
self.db.query(Site)
|
||||
.options(
|
||||
joinedload(Site.cidrs).joinedload(SiteCIDR.ips)
|
||||
)
|
||||
.filter(Site.name == name)
|
||||
.first()
|
||||
)
|
||||
@@ -230,7 +209,6 @@ class SiteService:
|
||||
"""
|
||||
query = (
|
||||
self.db.query(Site)
|
||||
.options(joinedload(Site.cidrs))
|
||||
.order_by(Site.name)
|
||||
)
|
||||
|
||||
@@ -245,160 +223,211 @@ class SiteService:
|
||||
"""
|
||||
sites = (
|
||||
self.db.query(Site)
|
||||
.options(joinedload(Site.cidrs))
|
||||
.order_by(Site.name)
|
||||
.all()
|
||||
)
|
||||
|
||||
return [self._site_to_dict(site) for site in sites]
|
||||
|
||||
def add_cidr(self, site_id: int, cidr: str, expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
def bulk_add_ips_from_cidr(self, site_id: int, cidr: str,
|
||||
expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Add a CIDR range to a site.
|
||||
Expand a CIDR range and add all IPs to a site.
|
||||
|
||||
CIDRs are NOT stored - they are just used to generate IP records.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
cidr: CIDR notation (e.g., "10.0.0.0/24")
|
||||
expected_ping: Expected ping response for IPs in this CIDR
|
||||
expected_ping: Expected ping response for all IPs
|
||||
expected_tcp_ports: List of expected TCP ports for all IPs
|
||||
expected_udp_ports: List of expected UDP ports for all IPs
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- cidr: The CIDR that was expanded
|
||||
- ip_count: Number of IPs created
|
||||
- ips_added: List of IP addresses created
|
||||
- ips_skipped: List of IPs that already existed
|
||||
|
||||
Raises:
|
||||
ValueError: If site not found or CIDR is invalid/too large
|
||||
"""
|
||||
site = self.db.query(Site).filter(Site.id == site_id).first()
|
||||
if not site:
|
||||
raise ValueError(f"Site with id {site_id} not found")
|
||||
|
||||
# Validate CIDR format and size
|
||||
try:
|
||||
network = ipaddress.ip_network(cidr, strict=False)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid CIDR notation '{cidr}': {str(e)}")
|
||||
|
||||
# Enforce CIDR size limits (max /24 for IPv4, /64 for IPv6)
|
||||
if isinstance(network, ipaddress.IPv4Network) and network.prefixlen < 24:
|
||||
raise ValueError(
|
||||
f"CIDR '{cidr}' is too large ({network.num_addresses} IPs). "
|
||||
f"Maximum allowed is /24 (256 IPs) for IPv4."
|
||||
)
|
||||
elif isinstance(network, ipaddress.IPv6Network) and network.prefixlen < 64:
|
||||
raise ValueError(
|
||||
f"CIDR '{cidr}' is too large. "
|
||||
f"Maximum allowed is /64 for IPv6."
|
||||
)
|
||||
|
||||
# Expand CIDR to individual IPs (no cidr_id since we're not storing CIDR)
|
||||
ip_count, ips_added, ips_skipped = self._expand_cidr_to_ips(
|
||||
site_id=site_id,
|
||||
network=network,
|
||||
expected_ping=expected_ping,
|
||||
expected_tcp_ports=expected_tcp_ports or [],
|
||||
expected_udp_ports=expected_udp_ports or []
|
||||
)
|
||||
|
||||
site.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Expanded CIDR '{cidr}' for site {site_id} ('{site.name}'): "
|
||||
f"added {ip_count} IPs, skipped {len(ips_skipped)} duplicates"
|
||||
)
|
||||
|
||||
return {
|
||||
'cidr': cidr,
|
||||
'ip_count': ip_count,
|
||||
'ips_added': ips_added,
|
||||
'ips_skipped': ips_skipped
|
||||
}
|
||||
|
||||
def bulk_add_ips_from_list(self, site_id: int, ip_list: List[str],
|
||||
expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Add multiple IPs from a list (e.g., from CSV/text import).
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
ip_list: List of IP addresses as strings
|
||||
expected_ping: Expected ping response for all IPs
|
||||
expected_tcp_ports: List of expected TCP ports for all IPs
|
||||
expected_udp_ports: List of expected UDP ports for all IPs
|
||||
|
||||
Returns:
|
||||
Dictionary with:
|
||||
- ip_count: Number of IPs successfully created
|
||||
- ips_added: List of IP addresses created
|
||||
- ips_skipped: List of IPs that already existed
|
||||
- errors: List of validation errors {ip: error_message}
|
||||
|
||||
Raises:
|
||||
ValueError: If site not found
|
||||
"""
|
||||
site = self.db.query(Site).filter(Site.id == site_id).first()
|
||||
if not site:
|
||||
raise ValueError(f"Site with id {site_id} not found")
|
||||
|
||||
ips_added = []
|
||||
ips_skipped = []
|
||||
errors = []
|
||||
|
||||
for ip_str in ip_list:
|
||||
ip_str = ip_str.strip()
|
||||
if not ip_str:
|
||||
continue # Skip empty lines
|
||||
|
||||
# Validate IP format
|
||||
try:
|
||||
ipaddress.ip_address(ip_str)
|
||||
except ValueError as e:
|
||||
errors.append({'ip': ip_str, 'error': f"Invalid IP address: {str(e)}"})
|
||||
continue
|
||||
|
||||
# Check for duplicate (across all IPs in the site)
|
||||
existing = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.site_id == site_id, SiteIP.ip_address == ip_str)
|
||||
.first()
|
||||
)
|
||||
if existing:
|
||||
ips_skipped.append(ip_str)
|
||||
continue
|
||||
|
||||
# Create IP record
|
||||
try:
|
||||
ip_obj = SiteIP(
|
||||
site_id=site_id,
|
||||
ip_address=ip_str,
|
||||
expected_ping=expected_ping,
|
||||
expected_tcp_ports=json.dumps(expected_tcp_ports or []),
|
||||
expected_udp_ports=json.dumps(expected_udp_ports or []),
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
|
||||
self.db.add(ip_obj)
|
||||
ips_added.append(ip_str)
|
||||
except Exception as e:
|
||||
errors.append({'ip': ip_str, 'error': f"Database error: {str(e)}"})
|
||||
|
||||
site.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
|
||||
logger.info(
|
||||
f"Bulk added {len(ips_added)} IPs to site {site_id} ('{site.name}'), "
|
||||
f"skipped {len(ips_skipped)} duplicates, {len(errors)} errors"
|
||||
)
|
||||
|
||||
return {
|
||||
'ip_count': len(ips_added),
|
||||
'ips_added': ips_added,
|
||||
'ips_skipped': ips_skipped,
|
||||
'errors': errors
|
||||
}
|
||||
|
||||
def add_standalone_ip(self, site_id: int, ip_address: str,
|
||||
expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Add a standalone IP (without a CIDR parent) to a site.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
ip_address: IP address to add
|
||||
expected_ping: Expected ping response
|
||||
expected_tcp_ports: List of expected TCP ports
|
||||
expected_udp_ports: List of expected UDP ports
|
||||
|
||||
Returns:
|
||||
Dictionary with CIDR data
|
||||
Dictionary with IP data
|
||||
|
||||
Raises:
|
||||
ValueError: If site not found, CIDR is invalid, or already exists
|
||||
ValueError: If site not found, IP is invalid, or already exists
|
||||
"""
|
||||
site = self.db.query(Site).filter(Site.id == site_id).first()
|
||||
if not site:
|
||||
raise ValueError(f"Site with id {site_id} not found")
|
||||
|
||||
# Validate CIDR format
|
||||
try:
|
||||
ipaddress.ip_network(cidr, strict=False)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid CIDR notation '{cidr}': {str(e)}")
|
||||
|
||||
# Check for duplicate CIDR
|
||||
existing = (
|
||||
self.db.query(SiteCIDR)
|
||||
.filter(SiteCIDR.site_id == site_id, SiteCIDR.cidr == cidr)
|
||||
.first()
|
||||
)
|
||||
if existing:
|
||||
raise ValueError(f"CIDR '{cidr}' already exists for this site")
|
||||
|
||||
# Create CIDR
|
||||
cidr_obj = SiteCIDR(
|
||||
site_id=site_id,
|
||||
cidr=cidr,
|
||||
expected_ping=expected_ping,
|
||||
expected_tcp_ports=json.dumps(expected_tcp_ports or []),
|
||||
expected_udp_ports=json.dumps(expected_udp_ports or []),
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
|
||||
self.db.add(cidr_obj)
|
||||
site.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
self.db.refresh(cidr_obj)
|
||||
|
||||
logger.info(f"Added CIDR '{cidr}' to site {site_id} ('{site.name}')")
|
||||
|
||||
return self._cidr_to_dict(cidr_obj)
|
||||
|
||||
def remove_cidr(self, site_id: int, cidr_id: int) -> None:
|
||||
"""
|
||||
Remove a CIDR range from a site.
|
||||
|
||||
Prevents removal if it's the last CIDR (sites must have at least one CIDR).
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
cidr_id: CIDR ID to remove
|
||||
|
||||
Raises:
|
||||
ValueError: If CIDR not found or it's the last CIDR
|
||||
"""
|
||||
site = self.db.query(Site).filter(Site.id == site_id).first()
|
||||
if not site:
|
||||
raise ValueError(f"Site with id {site_id} not found")
|
||||
|
||||
cidr = (
|
||||
self.db.query(SiteCIDR)
|
||||
.filter(SiteCIDR.id == cidr_id, SiteCIDR.site_id == site_id)
|
||||
.first()
|
||||
)
|
||||
if not cidr:
|
||||
raise ValueError(f"CIDR with id {cidr_id} not found for site {site_id}")
|
||||
|
||||
# Check if this is the last CIDR
|
||||
cidr_count = (
|
||||
self.db.query(func.count(SiteCIDR.id))
|
||||
.filter(SiteCIDR.site_id == site_id)
|
||||
.scalar()
|
||||
)
|
||||
|
||||
if cidr_count <= 1:
|
||||
raise ValueError(
|
||||
f"Cannot remove CIDR '{cidr.cidr}': site must have at least one CIDR range"
|
||||
)
|
||||
|
||||
self.db.delete(cidr)
|
||||
site.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
|
||||
logger.info(f"Removed CIDR '{cidr.cidr}' from site {site_id} ('{site.name}')")
|
||||
|
||||
def add_ip_override(self, cidr_id: int, ip_address: str,
|
||||
expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Add an IP-level expectation override within a CIDR.
|
||||
|
||||
Args:
|
||||
cidr_id: CIDR ID
|
||||
ip_address: IP address to override
|
||||
expected_ping: Override ping expectation
|
||||
expected_tcp_ports: Override TCP ports expectation
|
||||
expected_udp_ports: Override UDP ports expectation
|
||||
|
||||
Returns:
|
||||
Dictionary with IP override data
|
||||
|
||||
Raises:
|
||||
ValueError: If CIDR not found, IP is invalid, or not in CIDR range
|
||||
"""
|
||||
cidr = self.db.query(SiteCIDR).filter(SiteCIDR.id == cidr_id).first()
|
||||
if not cidr:
|
||||
raise ValueError(f"CIDR with id {cidr_id} not found")
|
||||
|
||||
# Validate IP format
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(ip_address)
|
||||
ipaddress.ip_address(ip_address)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid IP address '{ip_address}': {str(e)}")
|
||||
|
||||
# Validate IP is within CIDR range
|
||||
network = ipaddress.ip_network(cidr.cidr, strict=False)
|
||||
if ip_obj not in network:
|
||||
raise ValueError(f"IP address '{ip_address}' is not within CIDR '{cidr.cidr}'")
|
||||
|
||||
# Check for duplicate
|
||||
# Check for duplicate (across all IPs in the site)
|
||||
existing = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.site_cidr_id == cidr_id, SiteIP.ip_address == ip_address)
|
||||
.filter(SiteIP.site_id == site_id, SiteIP.ip_address == ip_address)
|
||||
.first()
|
||||
)
|
||||
if existing:
|
||||
raise ValueError(f"IP override for '{ip_address}' already exists in this CIDR")
|
||||
raise ValueError(f"IP '{ip_address}' already exists in this site")
|
||||
|
||||
# Create IP override
|
||||
ip_override = SiteIP(
|
||||
site_cidr_id=cidr_id,
|
||||
# Create IP
|
||||
ip_obj = SiteIP(
|
||||
site_id=site_id,
|
||||
ip_address=ip_address,
|
||||
expected_ping=expected_ping,
|
||||
expected_tcp_ports=json.dumps(expected_tcp_ports or []),
|
||||
@@ -406,38 +435,102 @@ class SiteService:
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
|
||||
self.db.add(ip_override)
|
||||
self.db.add(ip_obj)
|
||||
site.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
self.db.refresh(ip_override)
|
||||
self.db.refresh(ip_obj)
|
||||
|
||||
logger.info(f"Added IP override '{ip_address}' to CIDR {cidr_id} ('{cidr.cidr}')")
|
||||
logger.info(f"Added IP '{ip_address}' to site {site_id} ('{site.name}')")
|
||||
|
||||
return self._ip_override_to_dict(ip_override)
|
||||
return self._ip_to_dict(ip_obj)
|
||||
|
||||
def remove_ip_override(self, cidr_id: int, ip_id: int) -> None:
|
||||
def update_ip_settings(self, site_id: int, ip_id: int,
|
||||
expected_ping: Optional[bool] = None,
|
||||
expected_tcp_ports: Optional[List[int]] = None,
|
||||
expected_udp_ports: Optional[List[int]] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Remove an IP-level override.
|
||||
Update settings for an individual IP.
|
||||
|
||||
Args:
|
||||
cidr_id: CIDR ID
|
||||
ip_id: IP override ID to remove
|
||||
site_id: Site ID
|
||||
ip_id: IP ID to update
|
||||
expected_ping: New ping expectation (if provided)
|
||||
expected_tcp_ports: New TCP ports expectation (if provided)
|
||||
expected_udp_ports: New UDP ports expectation (if provided)
|
||||
|
||||
Returns:
|
||||
Dictionary with updated IP data
|
||||
|
||||
Raises:
|
||||
ValueError: If IP override not found
|
||||
ValueError: If IP not found
|
||||
"""
|
||||
ip_override = (
|
||||
ip_obj = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.id == ip_id, SiteIP.site_cidr_id == cidr_id)
|
||||
.filter(SiteIP.id == ip_id, SiteIP.site_id == site_id)
|
||||
.first()
|
||||
)
|
||||
if not ip_override:
|
||||
raise ValueError(f"IP override with id {ip_id} not found for CIDR {cidr_id}")
|
||||
if not ip_obj:
|
||||
raise ValueError(f"IP with id {ip_id} not found for site {site_id}")
|
||||
|
||||
ip_address = ip_override.ip_address
|
||||
self.db.delete(ip_override)
|
||||
# Update settings if provided
|
||||
if expected_ping is not None:
|
||||
ip_obj.expected_ping = expected_ping
|
||||
if expected_tcp_ports is not None:
|
||||
ip_obj.expected_tcp_ports = json.dumps(expected_tcp_ports)
|
||||
if expected_udp_ports is not None:
|
||||
ip_obj.expected_udp_ports = json.dumps(expected_udp_ports)
|
||||
|
||||
self.db.commit()
|
||||
self.db.refresh(ip_obj)
|
||||
|
||||
logger.info(f"Updated settings for IP '{ip_obj.ip_address}' in site {site_id}")
|
||||
|
||||
return self._ip_to_dict(ip_obj)
|
||||
|
||||
def remove_ip(self, site_id: int, ip_id: int) -> None:
|
||||
"""
|
||||
Remove an IP from a site.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
ip_id: IP ID to remove
|
||||
|
||||
Raises:
|
||||
ValueError: If IP not found
|
||||
"""
|
||||
ip_obj = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.id == ip_id, SiteIP.site_id == site_id)
|
||||
.first()
|
||||
)
|
||||
if not ip_obj:
|
||||
raise ValueError(f"IP with id {ip_id} not found for site {site_id}")
|
||||
|
||||
ip_address = ip_obj.ip_address
|
||||
self.db.delete(ip_obj)
|
||||
self.db.commit()
|
||||
|
||||
logger.info(f"Removed IP override '{ip_address}' from CIDR {cidr_id}")
|
||||
logger.info(f"Removed IP '{ip_address}' from site {site_id}")
|
||||
|
||||
def list_ips(self, site_id: int, page: int = 1, per_page: int = 50) -> PaginatedResult:
|
||||
"""
|
||||
List IPs in a site with pagination.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
page: Page number (1-indexed)
|
||||
per_page: Number of items per page
|
||||
|
||||
Returns:
|
||||
PaginatedResult with IP data
|
||||
"""
|
||||
query = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.site_id == site_id)
|
||||
.order_by(SiteIP.ip_address)
|
||||
)
|
||||
|
||||
return paginate(query, page, per_page, self._ip_to_dict)
|
||||
|
||||
def get_scan_usage(self, site_id: int) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
@@ -470,59 +563,91 @@ class SiteService:
|
||||
|
||||
# Private helper methods
|
||||
|
||||
def _add_cidr_to_site(self, site: Site, cidr_data: Dict[str, Any]) -> SiteCIDR:
|
||||
"""Helper to add CIDR during site creation."""
|
||||
cidr = cidr_data.get('cidr')
|
||||
if not cidr:
|
||||
raise ValueError("CIDR 'cidr' field is required")
|
||||
def _expand_cidr_to_ips(self, site_id: int,
|
||||
network: ipaddress.IPv4Network | ipaddress.IPv6Network,
|
||||
expected_ping: Optional[bool],
|
||||
expected_tcp_ports: List[int],
|
||||
expected_udp_ports: List[int]) -> tuple[int, List[str], List[str]]:
|
||||
"""
|
||||
Expand a CIDR to individual IP addresses.
|
||||
|
||||
# Validate CIDR format
|
||||
try:
|
||||
ipaddress.ip_network(cidr, strict=False)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid CIDR notation '{cidr}': {str(e)}")
|
||||
Args:
|
||||
site_id: Site ID
|
||||
network: ipaddress network object
|
||||
expected_ping: Default ping setting for all IPs
|
||||
expected_tcp_ports: Default TCP ports for all IPs
|
||||
expected_udp_ports: Default UDP ports for all IPs
|
||||
|
||||
cidr_obj = SiteCIDR(
|
||||
site_id=site.id,
|
||||
cidr=cidr,
|
||||
expected_ping=cidr_data.get('expected_ping'),
|
||||
expected_tcp_ports=json.dumps(cidr_data.get('expected_tcp_ports', [])),
|
||||
expected_udp_ports=json.dumps(cidr_data.get('expected_udp_ports', [])),
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
Returns:
|
||||
Tuple of (count of IPs created, list of IPs added, list of IPs skipped)
|
||||
"""
|
||||
ip_count = 0
|
||||
ips_added = []
|
||||
ips_skipped = []
|
||||
|
||||
self.db.add(cidr_obj)
|
||||
return cidr_obj
|
||||
# For /32 or /128 (single host), use the network address
|
||||
# For larger ranges, use hosts() to exclude network/broadcast addresses
|
||||
if network.num_addresses == 1:
|
||||
ip_list = [network.network_address]
|
||||
elif network.num_addresses == 2:
|
||||
# For /31 networks (point-to-point), both addresses are usable
|
||||
ip_list = [network.network_address, network.broadcast_address]
|
||||
else:
|
||||
# Use hosts() to get usable IPs (excludes network and broadcast)
|
||||
ip_list = list(network.hosts())
|
||||
|
||||
for ip in ip_list:
|
||||
ip_str = str(ip)
|
||||
|
||||
# Check for duplicate
|
||||
existing = (
|
||||
self.db.query(SiteIP)
|
||||
.filter(SiteIP.site_id == site_id, SiteIP.ip_address == ip_str)
|
||||
.first()
|
||||
)
|
||||
if existing:
|
||||
ips_skipped.append(ip_str)
|
||||
continue
|
||||
|
||||
# Create SiteIP entry
|
||||
ip_obj = SiteIP(
|
||||
site_id=site_id,
|
||||
ip_address=ip_str,
|
||||
expected_ping=expected_ping,
|
||||
expected_tcp_ports=json.dumps(expected_tcp_ports),
|
||||
expected_udp_ports=json.dumps(expected_udp_ports),
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
|
||||
self.db.add(ip_obj)
|
||||
ips_added.append(ip_str)
|
||||
ip_count += 1
|
||||
|
||||
return ip_count, ips_added, ips_skipped
|
||||
|
||||
def _site_to_dict(self, site: Site) -> Dict[str, Any]:
|
||||
"""Convert Site model to dictionary."""
|
||||
# Count IPs for this site
|
||||
ip_count = (
|
||||
self.db.query(func.count(SiteIP.id))
|
||||
.filter(SiteIP.site_id == site.id)
|
||||
.scalar() or 0
|
||||
)
|
||||
|
||||
return {
|
||||
'id': site.id,
|
||||
'name': site.name,
|
||||
'description': site.description,
|
||||
'created_at': site.created_at.isoformat() if site.created_at else None,
|
||||
'updated_at': site.updated_at.isoformat() if site.updated_at else None,
|
||||
'cidrs': [self._cidr_to_dict(cidr) for cidr in site.cidrs] if hasattr(site, 'cidrs') else []
|
||||
'ip_count': ip_count
|
||||
}
|
||||
|
||||
def _cidr_to_dict(self, cidr: SiteCIDR) -> Dict[str, Any]:
|
||||
"""Convert SiteCIDR model to dictionary."""
|
||||
return {
|
||||
'id': cidr.id,
|
||||
'site_id': cidr.site_id,
|
||||
'cidr': cidr.cidr,
|
||||
'expected_ping': cidr.expected_ping,
|
||||
'expected_tcp_ports': json.loads(cidr.expected_tcp_ports) if cidr.expected_tcp_ports else [],
|
||||
'expected_udp_ports': json.loads(cidr.expected_udp_ports) if cidr.expected_udp_ports else [],
|
||||
'created_at': cidr.created_at.isoformat() if cidr.created_at else None,
|
||||
'ip_overrides': [self._ip_override_to_dict(ip) for ip in cidr.ips] if hasattr(cidr, 'ips') else []
|
||||
}
|
||||
|
||||
def _ip_override_to_dict(self, ip: SiteIP) -> Dict[str, Any]:
|
||||
def _ip_to_dict(self, ip: SiteIP) -> Dict[str, Any]:
|
||||
"""Convert SiteIP model to dictionary."""
|
||||
return {
|
||||
'id': ip.id,
|
||||
'site_cidr_id': ip.site_cidr_id,
|
||||
'site_id': ip.site_id,
|
||||
'ip_address': ip.ip_address,
|
||||
'expected_ping': ip.expected_ping,
|
||||
'expected_tcp_ports': json.loads(ip.expected_tcp_ports) if ip.expected_tcp_ports else [],
|
||||
|
||||
Reference in New Issue
Block a user