"""Add reusable site definitions Revision ID: 006 Revises: 005 Create Date: 2025-11-19 This migration introduces reusable site definitions that can be shared across multiple scans. Sites are defined once with CIDR ranges and can be referenced in multiple scan configurations. """ from alembic import op import sqlalchemy as sa from sqlalchemy import text # revision identifiers, used by Alembic revision = '006' down_revision = '005' branch_labels = None depends_on = None def upgrade(): """ Create new site tables and migrate existing scan_sites data to the new structure. """ # Create sites table (master site definitions) op.create_table('sites', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('name', sa.String(length=255), nullable=False, comment='Unique site name'), sa.Column('description', sa.Text(), nullable=True, comment='Site description'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Site creation time'), sa.Column('updated_at', sa.DateTime(), nullable=False, comment='Last modification time'), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name', name='uix_site_name') ) op.create_index(op.f('ix_sites_name'), 'sites', ['name'], unique=True) # Create site_cidrs table (CIDR ranges for each site) op.create_table('site_cidrs', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('site_id', sa.Integer(), nullable=False, comment='FK to sites'), sa.Column('cidr', sa.String(length=45), nullable=False, comment='CIDR notation (e.g., 10.0.0.0/24)'), sa.Column('expected_ping', sa.Boolean(), nullable=True, comment='Expected ping response for this CIDR'), sa.Column('expected_tcp_ports', sa.Text(), nullable=True, comment='JSON array of expected TCP ports'), sa.Column('expected_udp_ports', sa.Text(), nullable=True, comment='JSON array of expected UDP ports'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='CIDR creation time'), sa.ForeignKeyConstraint(['site_id'], ['sites.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('site_id', 'cidr', name='uix_site_cidr') ) op.create_index(op.f('ix_site_cidrs_site_id'), 'site_cidrs', ['site_id'], unique=False) # Create site_ips table (IP-level overrides within CIDRs) op.create_table('site_ips', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('site_cidr_id', sa.Integer(), nullable=False, comment='FK to site_cidrs'), sa.Column('ip_address', sa.String(length=45), nullable=False, comment='IPv4 or IPv6 address'), sa.Column('expected_ping', sa.Boolean(), nullable=True, comment='Override ping expectation for this IP'), sa.Column('expected_tcp_ports', sa.Text(), nullable=True, comment='JSON array of expected TCP ports (overrides CIDR)'), sa.Column('expected_udp_ports', sa.Text(), nullable=True, comment='JSON array of expected UDP ports (overrides CIDR)'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='IP override creation time'), sa.ForeignKeyConstraint(['site_cidr_id'], ['site_cidrs.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('site_cidr_id', 'ip_address', name='uix_site_cidr_ip') ) op.create_index(op.f('ix_site_ips_site_cidr_id'), 'site_ips', ['site_cidr_id'], unique=False) # Create scan_site_associations table (many-to-many between scans and sites) op.create_table('scan_site_associations', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('site_id', sa.Integer(), nullable=False, comment='FK to sites'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Association creation time'), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.ForeignKeyConstraint(['site_id'], ['sites.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('scan_id', 'site_id', name='uix_scan_site') ) op.create_index(op.f('ix_scan_site_associations_scan_id'), 'scan_site_associations', ['scan_id'], unique=False) op.create_index(op.f('ix_scan_site_associations_site_id'), 'scan_site_associations', ['site_id'], unique=False) # Migrate existing data connection = op.get_bind() # 1. Extract unique site names from existing scan_sites and create master Site records # This groups all historical scan sites by name and creates one master site per unique name connection.execute(text(""" INSERT INTO sites (name, description, created_at, updated_at) SELECT DISTINCT site_name, 'Migrated from scan_sites' as description, datetime('now') as created_at, datetime('now') as updated_at FROM scan_sites WHERE site_name NOT IN (SELECT name FROM sites) """)) # 2. Create scan_site_associations linking scans to their sites # This maintains the historical relationship between scans and the sites they used connection.execute(text(""" INSERT INTO scan_site_associations (scan_id, site_id, created_at) SELECT DISTINCT ss.scan_id, s.id as site_id, datetime('now') as created_at FROM scan_sites ss INNER JOIN sites s ON s.name = ss.site_name WHERE NOT EXISTS ( SELECT 1 FROM scan_site_associations ssa WHERE ssa.scan_id = ss.scan_id AND ssa.site_id = s.id ) """)) # 3. For each migrated site, create a CIDR entry from the IPs in scan_ips # Since historical data has individual IPs, we'll create /32 CIDRs for each unique IP # This preserves the exact IP addresses while fitting them into the new CIDR-based model connection.execute(text(""" INSERT INTO site_cidrs (site_id, cidr, expected_ping, expected_tcp_ports, expected_udp_ports, created_at) SELECT DISTINCT s.id as site_id, si.ip_address || '/32' as cidr, si.ping_expected, '[]' as expected_tcp_ports, '[]' as expected_udp_ports, datetime('now') as created_at FROM scan_ips si INNER JOIN scan_sites ss ON ss.id = si.site_id INNER JOIN sites s ON s.name = ss.site_name WHERE NOT EXISTS ( SELECT 1 FROM site_cidrs sc WHERE sc.site_id = s.id AND sc.cidr = si.ip_address || '/32' ) GROUP BY s.id, si.ip_address, si.ping_expected """)) print("✓ Migration complete: Reusable sites created from historical scan data") print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM sites')).scalar()} master site(s)") print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM site_cidrs')).scalar()} CIDR range(s)") print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM scan_site_associations')).scalar()} scan-site association(s)") def downgrade(): """Remove reusable site tables.""" # Drop tables in reverse order of creation (respecting foreign keys) op.drop_index(op.f('ix_scan_site_associations_site_id'), table_name='scan_site_associations') op.drop_index(op.f('ix_scan_site_associations_scan_id'), table_name='scan_site_associations') op.drop_table('scan_site_associations') op.drop_index(op.f('ix_site_ips_site_cidr_id'), table_name='site_ips') op.drop_table('site_ips') op.drop_index(op.f('ix_site_cidrs_site_id'), table_name='site_cidrs') op.drop_table('site_cidrs') op.drop_index(op.f('ix_sites_name'), table_name='sites') op.drop_table('sites') print("✓ Downgrade complete: Reusable site tables removed")