"""Initial database schema for SneakyScanner Revision ID: 001 Revises: Create Date: 2025-11-13 18:00:00.000000 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '001' down_revision = None branch_labels = None depends_on = None def upgrade() -> None: """Create all initial tables for SneakyScanner.""" # Create schedules table first (referenced by scans) op.create_table('schedules', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('name', sa.String(length=255), nullable=False, comment='Schedule name (e.g., \'Daily prod scan\')'), sa.Column('config_file', sa.Text(), nullable=False, comment='Path to YAML config'), sa.Column('cron_expression', sa.String(length=100), nullable=False, comment='Cron-like schedule (e.g., \'0 2 * * *\')'), sa.Column('enabled', sa.Boolean(), nullable=False, comment='Is schedule active?'), sa.Column('last_run', sa.DateTime(), nullable=True, comment='Last execution time'), sa.Column('next_run', sa.DateTime(), nullable=True, comment='Next scheduled execution'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Schedule creation time'), sa.Column('updated_at', sa.DateTime(), nullable=False, comment='Last modification time'), sa.PrimaryKeyConstraint('id') ) # Create scans table op.create_table('scans', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=False, comment='Scan start time (UTC)'), sa.Column('duration', sa.Float(), nullable=True, comment='Total scan duration in seconds'), sa.Column('status', sa.String(length=20), nullable=False, comment='running, completed, failed'), sa.Column('config_file', sa.Text(), nullable=True, comment='Path to YAML config used'), sa.Column('title', sa.Text(), nullable=True, comment='Scan title from config'), sa.Column('json_path', sa.Text(), nullable=True, comment='Path to JSON report'), sa.Column('html_path', sa.Text(), nullable=True, comment='Path to HTML report'), sa.Column('zip_path', sa.Text(), nullable=True, comment='Path to ZIP archive'), sa.Column('screenshot_dir', sa.Text(), nullable=True, comment='Path to screenshot directory'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Record creation time'), sa.Column('triggered_by', sa.String(length=50), nullable=False, comment='manual, scheduled, api'), sa.Column('schedule_id', sa.Integer(), nullable=True, comment='FK to schedules if triggered by schedule'), sa.ForeignKeyConstraint(['schedule_id'], ['schedules.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_scans_timestamp'), 'scans', ['timestamp'], unique=False) # Create scan_sites table op.create_table('scan_sites', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('site_name', sa.String(length=255), nullable=False, comment='Site name from config'), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_scan_sites_scan_id'), 'scan_sites', ['scan_id'], unique=False) # Create scan_ips table op.create_table('scan_ips', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('site_id', sa.Integer(), nullable=False, comment='FK to scan_sites'), sa.Column('ip_address', sa.String(length=45), nullable=False, comment='IPv4 or IPv6 address'), sa.Column('ping_expected', sa.Boolean(), nullable=True, comment='Expected ping response'), sa.Column('ping_actual', sa.Boolean(), nullable=True, comment='Actual ping response'), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.ForeignKeyConstraint(['site_id'], ['scan_sites.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('scan_id', 'ip_address', name='uix_scan_ip') ) op.create_index(op.f('ix_scan_ips_scan_id'), 'scan_ips', ['scan_id'], unique=False) op.create_index(op.f('ix_scan_ips_site_id'), 'scan_ips', ['site_id'], unique=False) # Create scan_ports table op.create_table('scan_ports', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('ip_id', sa.Integer(), nullable=False, comment='FK to scan_ips'), sa.Column('port', sa.Integer(), nullable=False, comment='Port number (1-65535)'), sa.Column('protocol', sa.String(length=10), nullable=False, comment='tcp or udp'), sa.Column('expected', sa.Boolean(), nullable=True, comment='Was this port expected?'), sa.Column('state', sa.String(length=20), nullable=False, comment='open, closed, filtered'), sa.ForeignKeyConstraint(['ip_id'], ['scan_ips.id'], ), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('scan_id', 'ip_id', 'port', 'protocol', name='uix_scan_ip_port') ) op.create_index(op.f('ix_scan_ports_ip_id'), 'scan_ports', ['ip_id'], unique=False) op.create_index(op.f('ix_scan_ports_scan_id'), 'scan_ports', ['scan_id'], unique=False) # Create scan_services table op.create_table('scan_services', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('port_id', sa.Integer(), nullable=False, comment='FK to scan_ports'), sa.Column('service_name', sa.String(length=100), nullable=True, comment='Service name (e.g., ssh, http)'), sa.Column('product', sa.String(length=255), nullable=True, comment='Product name (e.g., OpenSSH)'), sa.Column('version', sa.String(length=100), nullable=True, comment='Version string'), sa.Column('extrainfo', sa.Text(), nullable=True, comment='Additional nmap info'), sa.Column('ostype', sa.String(length=100), nullable=True, comment='OS type if detected'), sa.Column('http_protocol', sa.String(length=10), nullable=True, comment='http or https (if web service)'), sa.Column('screenshot_path', sa.Text(), nullable=True, comment='Relative path to screenshot'), sa.ForeignKeyConstraint(['port_id'], ['scan_ports.id'], ), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_scan_services_port_id'), 'scan_services', ['port_id'], unique=False) op.create_index(op.f('ix_scan_services_scan_id'), 'scan_services', ['scan_id'], unique=False) # Create scan_certificates table op.create_table('scan_certificates', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('service_id', sa.Integer(), nullable=False, comment='FK to scan_services'), sa.Column('subject', sa.Text(), nullable=True, comment='Certificate subject (CN)'), sa.Column('issuer', sa.Text(), nullable=True, comment='Certificate issuer'), sa.Column('serial_number', sa.Text(), nullable=True, comment='Serial number'), sa.Column('not_valid_before', sa.DateTime(), nullable=True, comment='Validity start date'), sa.Column('not_valid_after', sa.DateTime(), nullable=True, comment='Validity end date'), sa.Column('days_until_expiry', sa.Integer(), nullable=True, comment='Days until expiration'), sa.Column('sans', sa.Text(), nullable=True, comment='JSON array of SANs'), sa.Column('is_self_signed', sa.Boolean(), nullable=True, comment='Self-signed certificate flag'), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.ForeignKeyConstraint(['service_id'], ['scan_services.id'], ), sa.PrimaryKeyConstraint('id'), comment='Index on expiration date for alert queries' ) op.create_index(op.f('ix_scan_certificates_scan_id'), 'scan_certificates', ['scan_id'], unique=False) op.create_index(op.f('ix_scan_certificates_service_id'), 'scan_certificates', ['service_id'], unique=False) # Create scan_tls_versions table op.create_table('scan_tls_versions', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('certificate_id', sa.Integer(), nullable=False, comment='FK to scan_certificates'), sa.Column('tls_version', sa.String(length=20), nullable=False, comment='TLS 1.0, TLS 1.1, TLS 1.2, TLS 1.3'), sa.Column('supported', sa.Boolean(), nullable=False, comment='Is this version supported?'), sa.Column('cipher_suites', sa.Text(), nullable=True, comment='JSON array of cipher suites'), sa.ForeignKeyConstraint(['certificate_id'], ['scan_certificates.id'], ), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_scan_tls_versions_certificate_id'), 'scan_tls_versions', ['certificate_id'], unique=False) op.create_index(op.f('ix_scan_tls_versions_scan_id'), 'scan_tls_versions', ['scan_id'], unique=False) # Create alerts table op.create_table('alerts', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('scan_id', sa.Integer(), nullable=False, comment='FK to scans'), sa.Column('alert_type', sa.String(length=50), nullable=False, comment='new_port, cert_expiry, service_change, ping_failed'), sa.Column('severity', sa.String(length=20), nullable=False, comment='info, warning, critical'), sa.Column('message', sa.Text(), nullable=False, comment='Human-readable alert message'), sa.Column('ip_address', sa.String(length=45), nullable=True, comment='Related IP (optional)'), sa.Column('port', sa.Integer(), nullable=True, comment='Related port (optional)'), sa.Column('email_sent', sa.Boolean(), nullable=False, comment='Was email notification sent?'), sa.Column('email_sent_at', sa.DateTime(), nullable=True, comment='Email send timestamp'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Alert creation time'), sa.ForeignKeyConstraint(['scan_id'], ['scans.id'], ), sa.PrimaryKeyConstraint('id'), comment='Indexes for alert filtering' ) op.create_index(op.f('ix_alerts_scan_id'), 'alerts', ['scan_id'], unique=False) # Create alert_rules table op.create_table('alert_rules', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('rule_type', sa.String(length=50), nullable=False, comment='unexpected_port, cert_expiry, service_down, etc.'), sa.Column('enabled', sa.Boolean(), nullable=False, comment='Is rule active?'), sa.Column('threshold', sa.Integer(), nullable=True, comment='Threshold value (e.g., days for cert expiry)'), sa.Column('email_enabled', sa.Boolean(), nullable=False, comment='Send email for this rule?'), sa.Column('created_at', sa.DateTime(), nullable=False, comment='Rule creation time'), sa.PrimaryKeyConstraint('id') ) # Create settings table op.create_table('settings', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('key', sa.String(length=255), nullable=False, comment='Setting key (e.g., smtp_server)'), sa.Column('value', sa.Text(), nullable=True, comment='Setting value (JSON for complex values)'), sa.Column('updated_at', sa.DateTime(), nullable=False, comment='Last modification time'), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('key') ) op.create_index(op.f('ix_settings_key'), 'settings', ['key'], unique=True) def downgrade() -> None: """Drop all tables.""" op.drop_index(op.f('ix_settings_key'), table_name='settings') op.drop_table('settings') op.drop_table('alert_rules') op.drop_index(op.f('ix_alerts_scan_id'), table_name='alerts') op.drop_table('alerts') op.drop_index(op.f('ix_scan_tls_versions_scan_id'), table_name='scan_tls_versions') op.drop_index(op.f('ix_scan_tls_versions_certificate_id'), table_name='scan_tls_versions') op.drop_table('scan_tls_versions') op.drop_index(op.f('ix_scan_certificates_service_id'), table_name='scan_certificates') op.drop_index(op.f('ix_scan_certificates_scan_id'), table_name='scan_certificates') op.drop_table('scan_certificates') op.drop_index(op.f('ix_scan_services_scan_id'), table_name='scan_services') op.drop_index(op.f('ix_scan_services_port_id'), table_name='scan_services') op.drop_table('scan_services') op.drop_index(op.f('ix_scan_ports_scan_id'), table_name='scan_ports') op.drop_index(op.f('ix_scan_ports_ip_id'), table_name='scan_ports') op.drop_table('scan_ports') op.drop_index(op.f('ix_scan_ips_site_id'), table_name='scan_ips') op.drop_index(op.f('ix_scan_ips_scan_id'), table_name='scan_ips') op.drop_table('scan_ips') op.drop_index(op.f('ix_scan_sites_scan_id'), table_name='scan_sites') op.drop_table('scan_sites') op.drop_index(op.f('ix_scans_timestamp'), table_name='scans') op.drop_table('scans') op.drop_table('schedules')