Files
SneakyScan/app/migrations/versions/006_add_reusable_sites.py

162 lines
7.7 KiB
Python

"""Add reusable site definitions
Revision ID: 006
Revises: 005
Create Date: 2025-11-19
This migration introduces reusable site definitions that can be shared across
multiple scans. Sites are defined once with CIDR ranges and can be referenced
in multiple scan configurations.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
# revision identifiers, used by Alembic
revision = '006'
down_revision = '005'
branch_labels = None
depends_on = None
def upgrade():
"""
Create new site tables and migrate existing scan_sites data to the new structure.
"""
# Create sites table (master site definitions)
op.create_table('sites',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=255), nullable=False, comment='Unique site name'),
sa.Column('description', sa.Text(), nullable=True, comment='Site description'),
sa.Column('created_at', sa.DateTime(), nullable=False, comment='Site creation time'),
sa.Column('updated_at', sa.DateTime(), nullable=False, comment='Last modification time'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', name='uix_site_name')
)
op.create_index(op.f('ix_sites_name'), 'sites', ['name'], unique=True)
# Create site_cidrs table (CIDR ranges for each site)
op.create_table('site_cidrs',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('site_id', sa.Integer(), nullable=False, comment='FK to sites'),
sa.Column('cidr', sa.String(length=45), nullable=False, comment='CIDR notation (e.g., 10.0.0.0/24)'),
sa.Column('expected_ping', sa.Boolean(), nullable=True, comment='Expected ping response for this CIDR'),
sa.Column('expected_tcp_ports', sa.Text(), nullable=True, comment='JSON array of expected TCP ports'),
sa.Column('expected_udp_ports', sa.Text(), nullable=True, comment='JSON array of expected UDP ports'),
sa.Column('created_at', sa.DateTime(), nullable=False, comment='CIDR creation time'),
sa.ForeignKeyConstraint(['site_id'], ['sites.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('site_id', 'cidr', name='uix_site_cidr')
)
op.create_index(op.f('ix_site_cidrs_site_id'), 'site_cidrs', ['site_id'], unique=False)
# Create site_ips table (IP-level overrides within CIDRs)
op.create_table('site_ips',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('site_cidr_id', sa.Integer(), nullable=False, comment='FK to site_cidrs'),
sa.Column('ip_address', sa.String(length=45), nullable=False, comment='IPv4 or IPv6 address'),
sa.Column('expected_ping', sa.Boolean(), nullable=True, comment='Override ping expectation for this IP'),
sa.Column('expected_tcp_ports', sa.Text(), nullable=True, comment='JSON array of expected TCP ports (overrides CIDR)'),
sa.Column('expected_udp_ports', sa.Text(), nullable=True, comment='JSON array of expected UDP ports (overrides CIDR)'),
sa.Column('created_at', sa.DateTime(), nullable=False, comment='IP override creation time'),
sa.ForeignKeyConstraint(['site_cidr_id'], ['site_cidrs.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('site_cidr_id', 'ip_address', name='uix_site_cidr_ip')
)
op.create_index(op.f('ix_site_ips_site_cidr_id'), 'site_ips', ['site_cidr_id'], unique=False)
# Create scan_site_associations table (many-to-many between scans and sites)
op.create_table('scan_site_associations',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'),
sa.Column('site_id', sa.Integer(), nullable=False, comment='FK to sites'),
sa.Column('created_at', sa.DateTime(), nullable=False, comment='Association creation time'),
sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ),
sa.ForeignKeyConstraint(['site_id'], ['sites.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('scan_id', 'site_id', name='uix_scan_site')
)
op.create_index(op.f('ix_scan_site_associations_scan_id'), 'scan_site_associations', ['scan_id'], unique=False)
op.create_index(op.f('ix_scan_site_associations_site_id'), 'scan_site_associations', ['site_id'], unique=False)
# Migrate existing data
connection = op.get_bind()
# 1. Extract unique site names from existing scan_sites and create master Site records
# This groups all historical scan sites by name and creates one master site per unique name
connection.execute(text("""
INSERT INTO sites (name, description, created_at, updated_at)
SELECT DISTINCT
site_name,
'Migrated from scan_sites' as description,
datetime('now') as created_at,
datetime('now') as updated_at
FROM scan_sites
WHERE site_name NOT IN (SELECT name FROM sites)
"""))
# 2. Create scan_site_associations linking scans to their sites
# This maintains the historical relationship between scans and the sites they used
connection.execute(text("""
INSERT INTO scan_site_associations (scan_id, site_id, created_at)
SELECT DISTINCT
ss.scan_id,
s.id as site_id,
datetime('now') as created_at
FROM scan_sites ss
INNER JOIN sites s ON s.name = ss.site_name
WHERE NOT EXISTS (
SELECT 1 FROM scan_site_associations ssa
WHERE ssa.scan_id = ss.scan_id AND ssa.site_id = s.id
)
"""))
# 3. For each migrated site, create a CIDR entry from the IPs in scan_ips
# Since historical data has individual IPs, we'll create /32 CIDRs for each unique IP
# This preserves the exact IP addresses while fitting them into the new CIDR-based model
connection.execute(text("""
INSERT INTO site_cidrs (site_id, cidr, expected_ping, expected_tcp_ports, expected_udp_ports, created_at)
SELECT DISTINCT
s.id as site_id,
si.ip_address || '/32' as cidr,
si.ping_expected,
'[]' as expected_tcp_ports,
'[]' as expected_udp_ports,
datetime('now') as created_at
FROM scan_ips si
INNER JOIN scan_sites ss ON ss.id = si.site_id
INNER JOIN sites s ON s.name = ss.site_name
WHERE NOT EXISTS (
SELECT 1 FROM site_cidrs sc
WHERE sc.site_id = s.id AND sc.cidr = si.ip_address || '/32'
)
GROUP BY s.id, si.ip_address, si.ping_expected
"""))
print("✓ Migration complete: Reusable sites created from historical scan data")
print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM sites')).scalar()} master site(s)")
print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM site_cidrs')).scalar()} CIDR range(s)")
print(f" - Created {connection.execute(text('SELECT COUNT(*) FROM scan_site_associations')).scalar()} scan-site association(s)")
def downgrade():
"""Remove reusable site tables."""
# Drop tables in reverse order of creation (respecting foreign keys)
op.drop_index(op.f('ix_scan_site_associations_site_id'), table_name='scan_site_associations')
op.drop_index(op.f('ix_scan_site_associations_scan_id'), table_name='scan_site_associations')
op.drop_table('scan_site_associations')
op.drop_index(op.f('ix_site_ips_site_cidr_id'), table_name='site_ips')
op.drop_table('site_ips')
op.drop_index(op.f('ix_site_cidrs_site_id'), table_name='site_cidrs')
op.drop_table('site_cidrs')
op.drop_index(op.f('ix_sites_name'), table_name='sites')
op.drop_table('sites')
print("✓ Downgrade complete: Reusable site tables removed")