Files
SneakyScan/app/migrations/versions/008_expand_cidrs_to_ips.py
Phillip Tarrant 0ec338e252 Migrate from file-based configs to database with per-IP site configuration
Major architectural changes:
   - Replace YAML config files with database-stored ScanConfig model
   - Remove CIDR block support in favor of individual IP addresses per site
   - Each IP now has its own expected_ping, expected_tcp_ports, expected_udp_ports
   - AlertRule now uses config_id FK instead of config_file string

   API changes:
   - POST /api/scans now requires config_id instead of config_file
   - Alert rules API uses config_id with validation
   - All config dropdowns fetch from /api/configs dynamically

   Template updates:
   - scans.html, dashboard.html, alert_rules.html load configs via API
   - Display format: Config Title (X sites) in dropdowns
   - Removed Jinja2 config_files loops

   Migrations:
   - 008: Expand CIDRs to individual IPs with per-IP port configs
   - 009: Remove CIDR-related columns
   - 010: Add config_id to alert_rules, remove config_file
2025-11-19 19:40:34 -06:00

271 lines
10 KiB
Python

"""Expand CIDRs to individual IPs with per-IP settings
Revision ID: 008
Revises: 007
Create Date: 2025-11-19
This migration changes the site architecture to automatically expand CIDRs into
individual IPs in the database. Each IP has its own port and ping settings.
Changes:
- Add site_id to site_ips (direct link to sites, support standalone IPs)
- Make site_cidr_id nullable (IPs can exist without a CIDR parent)
- Remove settings from site_cidrs (settings now only at IP level)
- Add unique constraint: no duplicate IPs within a site
- Expand existing CIDRs to individual IPs
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
import ipaddress
# revision identifiers, used by Alembic
revision = '008'
down_revision = '007'
branch_labels = None
depends_on = None
def upgrade():
"""
Modify schema to support per-IP settings and auto-expand CIDRs.
"""
connection = op.get_bind()
# Check if site_id column already exists
inspector = sa.inspect(connection)
site_ips_columns = [col['name'] for col in inspector.get_columns('site_ips')]
site_cidrs_columns = [col['name'] for col in inspector.get_columns('site_cidrs')]
# Step 1: Add site_id column to site_ips (will be populated from site_cidr_id)
if 'site_id' not in site_ips_columns:
print("Adding site_id column to site_ips...")
op.add_column('site_ips', sa.Column('site_id', sa.Integer(), nullable=True, comment='FK to sites (direct link)'))
else:
print("site_id column already exists in site_ips, skipping...")
# Step 2: Populate site_id from site_cidr_id (before we make it nullable)
print("Populating site_id from existing site_cidr relationships...")
connection.execute(text("""
UPDATE site_ips
SET site_id = (
SELECT site_id
FROM site_cidrs
WHERE site_cidrs.id = site_ips.site_cidr_id
)
WHERE site_cidr_id IS NOT NULL
"""))
# Step 3: Make site_id NOT NULL and add foreign key
# Check if foreign key exists before creating
try:
op.alter_column('site_ips', 'site_id', nullable=False)
print("Made site_id NOT NULL")
except Exception as e:
print(f"site_id already NOT NULL or error: {e}")
# Check if foreign key exists
try:
op.create_foreign_key('fk_site_ips_site_id', 'site_ips', 'sites', ['site_id'], ['id'])
print("Created foreign key fk_site_ips_site_id")
except Exception as e:
print(f"Foreign key already exists or error: {e}")
# Check if index exists
try:
op.create_index(op.f('ix_site_ips_site_id'), 'site_ips', ['site_id'], unique=False)
print("Created index ix_site_ips_site_id")
except Exception as e:
print(f"Index already exists or error: {e}")
# Step 4: Make site_cidr_id nullable (for standalone IPs)
try:
op.alter_column('site_ips', 'site_cidr_id', nullable=True)
print("Made site_cidr_id nullable")
except Exception as e:
print(f"site_cidr_id already nullable or error: {e}")
# Step 5: Drop old unique constraint and create new one (site_id, ip_address)
# This prevents duplicate IPs within a site (across all CIDRs and standalone)
try:
op.drop_constraint('uix_site_cidr_ip', 'site_ips', type_='unique')
print("Dropped old constraint uix_site_cidr_ip")
except Exception as e:
print(f"Constraint already dropped or doesn't exist: {e}")
try:
op.create_unique_constraint('uix_site_ip_address', 'site_ips', ['site_id', 'ip_address'])
print("Created new constraint uix_site_ip_address")
except Exception as e:
print(f"Constraint already exists or error: {e}")
# Step 6: Expand existing CIDRs to individual IPs
print("Expanding existing CIDRs to individual IPs...")
# Get all existing CIDRs
cidrs = connection.execute(text("""
SELECT id, site_id, cidr, expected_ping, expected_tcp_ports, expected_udp_ports
FROM site_cidrs
""")).fetchall()
expanded_count = 0
skipped_count = 0
for cidr_row in cidrs:
cidr_id, site_id, cidr_str, expected_ping, expected_tcp_ports, expected_udp_ports = cidr_row
try:
# Parse CIDR
network = ipaddress.ip_network(cidr_str, strict=False)
# Check size - skip if too large (> /24 for IPv4, > /64 for IPv6)
if isinstance(network, ipaddress.IPv4Network) and network.prefixlen < 24:
print(f" ⚠ Skipping large CIDR {cidr_str} (>{network.num_addresses} IPs)")
skipped_count += 1
continue
elif isinstance(network, ipaddress.IPv6Network) and network.prefixlen < 64:
print(f" ⚠ Skipping large CIDR {cidr_str} (>{network.num_addresses} IPs)")
skipped_count += 1
continue
# Expand to individual IPs
for ip in network.hosts() if network.num_addresses > 2 else [network.network_address]:
ip_str = str(ip)
# Check if this IP already exists (from previous IP overrides)
existing = connection.execute(text("""
SELECT id FROM site_ips
WHERE site_cidr_id = :cidr_id AND ip_address = :ip_address
"""), {'cidr_id': cidr_id, 'ip_address': ip_str}).fetchone()
if not existing:
# Insert new IP with settings from CIDR
connection.execute(text("""
INSERT INTO site_ips (
site_id, site_cidr_id, ip_address,
expected_ping, expected_tcp_ports, expected_udp_ports,
created_at
)
VALUES (
:site_id, :cidr_id, :ip_address,
:expected_ping, :expected_tcp_ports, :expected_udp_ports,
datetime('now')
)
"""), {
'site_id': site_id,
'cidr_id': cidr_id,
'ip_address': ip_str,
'expected_ping': expected_ping,
'expected_tcp_ports': expected_tcp_ports,
'expected_udp_ports': expected_udp_ports
})
expanded_count += 1
except Exception as e:
print(f" ✗ Error expanding CIDR {cidr_str}: {e}")
skipped_count += 1
continue
print(f" ✓ Expanded {expanded_count} IPs from CIDRs")
if skipped_count > 0:
print(f" ⚠ Skipped {skipped_count} CIDRs (too large or errors)")
# Step 7: Remove settings columns from site_cidrs (now only at IP level)
print("Removing settings columns from site_cidrs...")
# Re-inspect to get current columns
site_cidrs_columns = [col['name'] for col in inspector.get_columns('site_cidrs')]
if 'expected_ping' in site_cidrs_columns:
try:
op.drop_column('site_cidrs', 'expected_ping')
print("Dropped expected_ping from site_cidrs")
except Exception as e:
print(f"Error dropping expected_ping: {e}")
else:
print("expected_ping already dropped from site_cidrs")
if 'expected_tcp_ports' in site_cidrs_columns:
try:
op.drop_column('site_cidrs', 'expected_tcp_ports')
print("Dropped expected_tcp_ports from site_cidrs")
except Exception as e:
print(f"Error dropping expected_tcp_ports: {e}")
else:
print("expected_tcp_ports already dropped from site_cidrs")
if 'expected_udp_ports' in site_cidrs_columns:
try:
op.drop_column('site_cidrs', 'expected_udp_ports')
print("Dropped expected_udp_ports from site_cidrs")
except Exception as e:
print(f"Error dropping expected_udp_ports: {e}")
else:
print("expected_udp_ports already dropped from site_cidrs")
# Print summary
total_sites = connection.execute(text('SELECT COUNT(*) FROM sites')).scalar()
total_cidrs = connection.execute(text('SELECT COUNT(*) FROM site_cidrs')).scalar()
total_ips = connection.execute(text('SELECT COUNT(*) FROM site_ips')).scalar()
print("\n✓ Migration 008 complete: CIDRs expanded to individual IPs")
print(f" - Total sites: {total_sites}")
print(f" - Total CIDRs: {total_cidrs}")
print(f" - Total IPs: {total_ips}")
def downgrade():
"""
Revert schema changes (restore CIDR-level settings).
Note: This will lose per-IP granularity!
"""
connection = op.get_bind()
print("Rolling back to CIDR-level settings...")
# Step 1: Add settings columns back to site_cidrs
op.add_column('site_cidrs', sa.Column('expected_ping', sa.Boolean(), nullable=True))
op.add_column('site_cidrs', sa.Column('expected_tcp_ports', sa.Text(), nullable=True))
op.add_column('site_cidrs', sa.Column('expected_udp_ports', sa.Text(), nullable=True))
# Step 2: Populate CIDR settings from first IP in each CIDR (approximation)
connection.execute(text("""
UPDATE site_cidrs
SET
expected_ping = (
SELECT expected_ping FROM site_ips
WHERE site_ips.site_cidr_id = site_cidrs.id
LIMIT 1
),
expected_tcp_ports = (
SELECT expected_tcp_ports FROM site_ips
WHERE site_ips.site_cidr_id = site_cidrs.id
LIMIT 1
),
expected_udp_ports = (
SELECT expected_udp_ports FROM site_ips
WHERE site_ips.site_cidr_id = site_cidrs.id
LIMIT 1
)
"""))
# Step 3: Delete auto-expanded IPs (keep only original overrides)
# In practice, this is difficult to determine, so we'll keep all IPs
# and just remove the schema changes
# Step 4: Drop new unique constraint and restore old one
op.drop_constraint('uix_site_ip_address', 'site_ips', type_='unique')
op.create_unique_constraint('uix_site_cidr_ip', 'site_ips', ['site_cidr_id', 'ip_address'])
# Step 5: Make site_cidr_id NOT NULL again
op.alter_column('site_ips', 'site_cidr_id', nullable=False)
# Step 6: Drop site_id column and related constraints
op.drop_index(op.f('ix_site_ips_site_id'), table_name='site_ips')
op.drop_constraint('fk_site_ips_site_id', 'site_ips', type_='foreignkey')
op.drop_column('site_ips', 'site_id')
print("✓ Downgrade complete: Reverted to CIDR-level settings")