Files
SneakyScan/app/init_db.py

350 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Database initialization script for SneakyScanner.
This script:
1. Creates the database schema using Alembic migrations
2. Initializes default settings
3. Optionally sets up an initial admin password
Usage:
python3 init_db.py [--password PASSWORD]
"""
import argparse
import os
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone
from web.models import Base, AlertRule
from web.utils.settings import PasswordManager, SettingsManager
def init_default_alert_rules(session):
"""
Create default alert rules for Phase 5.
Args:
session: Database session
"""
print("Initializing default alert rules...")
# Check if alert rules already exist
existing_rules = session.query(AlertRule).count()
if existing_rules > 0:
print(f" Alert rules already exist ({existing_rules} rules), skipping...")
return
default_rules = [
{
'name': 'Unexpected Port Detection',
'rule_type': 'unexpected_port',
'enabled': True,
'threshold': None,
'email_enabled': False,
'webhook_enabled': False,
'severity': 'warning',
'filter_conditions': None,
'config_file': None
},
{
'name': 'Drift Detection',
'rule_type': 'drift_detection',
'enabled': True,
'threshold': None, # No threshold means alert on any drift
'email_enabled': False,
'webhook_enabled': False,
'severity': 'info',
'filter_conditions': None,
'config_file': None
},
{
'name': 'Certificate Expiry Warning',
'rule_type': 'cert_expiry',
'enabled': True,
'threshold': 30, # Alert when certs expire in 30 days
'email_enabled': False,
'webhook_enabled': False,
'severity': 'warning',
'filter_conditions': None,
'config_file': None
},
{
'name': 'Weak TLS Detection',
'rule_type': 'weak_tls',
'enabled': True,
'threshold': None,
'email_enabled': False,
'webhook_enabled': False,
'severity': 'warning',
'filter_conditions': None,
'config_file': None
},
{
'name': 'Host Down Detection',
'rule_type': 'ping_failed',
'enabled': True,
'threshold': None,
'email_enabled': False,
'webhook_enabled': False,
'severity': 'critical',
'filter_conditions': None,
'config_file': None
}
]
try:
for rule_data in default_rules:
rule = AlertRule(
name=rule_data['name'],
rule_type=rule_data['rule_type'],
enabled=rule_data['enabled'],
threshold=rule_data['threshold'],
email_enabled=rule_data['email_enabled'],
webhook_enabled=rule_data['webhook_enabled'],
severity=rule_data['severity'],
filter_conditions=rule_data['filter_conditions'],
config_file=rule_data['config_file'],
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
session.add(rule)
print(f" ✓ Created rule: {rule.name}")
session.commit()
print(f"✓ Created {len(default_rules)} default alert rules")
except Exception as e:
print(f"✗ Failed to create default alert rules: {e}")
session.rollback()
raise
def init_database(db_url: str = "sqlite:///./sneakyscanner.db", run_migrations: bool = True):
"""
Initialize the database schema and settings.
Args:
db_url: Database URL (defaults to SQLite in current directory)
run_migrations: Whether to run Alembic migrations (True) or create all tables directly (False)
"""
print(f"Initializing SneakyScanner database at: {db_url}")
# Create database directory if it doesn't exist (for SQLite)
if db_url.startswith('sqlite:///'):
db_path = db_url.replace('sqlite:///', '')
db_dir = Path(db_path).parent
if not db_dir.exists():
print(f"Creating database directory: {db_dir}")
db_dir.mkdir(parents=True, exist_ok=True)
if run_migrations:
# Run Alembic migrations
print("Running Alembic migrations...")
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", db_url)
try:
# Upgrade to head (latest migration)
command.upgrade(alembic_cfg, "head")
print("✓ Database schema created successfully via migrations")
except Exception as e:
print(f"✗ Migration failed: {e}")
print("Falling back to direct table creation...")
run_migrations = False
if not run_migrations:
# Create tables directly using SQLAlchemy (fallback or if migrations disabled)
print("Creating database schema directly...")
engine = create_engine(db_url, echo=False)
Base.metadata.create_all(engine)
print("✓ Database schema created successfully")
# Initialize settings
print("\nInitializing default settings...")
engine = create_engine(db_url, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
try:
settings_manager = SettingsManager(session)
settings_manager.init_defaults()
print("✓ Default settings initialized")
# Initialize default alert rules
init_default_alert_rules(session)
except Exception as e:
print(f"✗ Failed to initialize settings: {e}")
session.rollback()
raise
finally:
session.close()
print("\n✓ Database initialization complete!")
return True
def set_password(db_url: str, password: str):
"""
Set the application password.
Args:
db_url: Database URL
password: Password to set
"""
print("Setting application password...")
engine = create_engine(db_url, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
try:
settings_manager = SettingsManager(session)
PasswordManager.set_app_password(settings_manager, password)
print("✓ Password set successfully")
except Exception as e:
print(f"✗ Failed to set password: {e}")
session.rollback()
raise
finally:
session.close()
def verify_database(db_url: str):
"""
Verify database schema and settings.
Args:
db_url: Database URL
"""
print("\nVerifying database...")
engine = create_engine(db_url, echo=False)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Check if tables exist by querying settings
from web.models import Setting
count = session.query(Setting).count()
print(f"✓ Settings table accessible ({count} settings found)")
# Display current settings (sanitized)
settings_manager = SettingsManager(session)
settings = settings_manager.get_all(decrypt=False, sanitize=True)
print("\nCurrent settings:")
for key, value in sorted(settings.items()):
print(f" {key}: {value}")
except Exception as e:
print(f"✗ Database verification failed: {e}")
raise
finally:
session.close()
def main():
"""Main entry point for database initialization."""
parser = argparse.ArgumentParser(
description="Initialize SneakyScanner database",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Initialize database with default settings
python3 init_db.py
# Initialize and set password
python3 init_db.py --password mysecretpassword
# Use custom database URL
python3 init_db.py --db-url postgresql://user:pass@localhost/sneakyscanner
# Force initialization without prompting (for Docker/scripts)
python3 init_db.py --force --password mysecret
# Verify existing database
python3 init_db.py --verify-only
"""
)
parser.add_argument(
'--db-url',
default='sqlite:///./sneakyscanner.db',
help='Database URL (default: sqlite:///./sneakyscanner.db)'
)
parser.add_argument(
'--password',
help='Set application password'
)
parser.add_argument(
'--verify-only',
action='store_true',
help='Only verify database, do not initialize'
)
parser.add_argument(
'--no-migrations',
action='store_true',
help='Create tables directly instead of using migrations'
)
parser.add_argument(
'--force',
action='store_true',
help='Force initialization without prompting (for non-interactive environments)'
)
args = parser.parse_args()
# Check if database already exists
db_exists = False
if args.db_url.startswith('sqlite:///'):
db_path = args.db_url.replace('sqlite:///', '')
db_exists = Path(db_path).exists()
if db_exists and not args.verify_only and not args.force:
response = input(f"\nDatabase already exists at {db_path}. Reinitialize? (y/N): ")
if response.lower() != 'y':
print("Aborting.")
return
try:
if args.verify_only:
verify_database(args.db_url)
else:
# Initialize database
init_database(args.db_url, run_migrations=not args.no_migrations)
# Set password if provided
if args.password:
set_password(args.db_url, args.password)
# Verify
verify_database(args.db_url)
print("\n✓ All done! Database is ready to use.")
if not args.password and not args.verify_only:
print("\n⚠ WARNING: No password set. Run with --password to set one:")
print(f" python3 init_db.py --db-url {args.db_url} --password YOUR_PASSWORD")
except Exception as e:
print(f"\n✗ Initialization failed: {e}")
sys.exit(1)
if __name__ == '__main__':
main()