350 lines
10 KiB
Python
Executable File
350 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Database initialization script for SneakyScanner.
|
|
|
|
This script:
|
|
1. Creates the database schema using Alembic migrations
|
|
2. Initializes default settings
|
|
3. Optionally sets up an initial admin password
|
|
|
|
Usage:
|
|
python3 init_db.py [--password PASSWORD]
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from alembic import command
|
|
from alembic.config import Config
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from datetime import datetime, timezone
|
|
|
|
from web.models import Base, AlertRule
|
|
from web.utils.settings import PasswordManager, SettingsManager
|
|
|
|
|
|
def init_default_alert_rules(session):
|
|
"""
|
|
Create default alert rules for Phase 5.
|
|
|
|
Args:
|
|
session: Database session
|
|
"""
|
|
print("Initializing default alert rules...")
|
|
|
|
# Check if alert rules already exist
|
|
existing_rules = session.query(AlertRule).count()
|
|
if existing_rules > 0:
|
|
print(f" Alert rules already exist ({existing_rules} rules), skipping...")
|
|
return
|
|
|
|
default_rules = [
|
|
{
|
|
'name': 'Unexpected Port Detection',
|
|
'rule_type': 'unexpected_port',
|
|
'enabled': True,
|
|
'threshold': None,
|
|
'email_enabled': False,
|
|
'webhook_enabled': False,
|
|
'severity': 'warning',
|
|
'filter_conditions': None,
|
|
'config_id': None
|
|
},
|
|
{
|
|
'name': 'Drift Detection',
|
|
'rule_type': 'drift_detection',
|
|
'enabled': True,
|
|
'threshold': None, # No threshold means alert on any drift
|
|
'email_enabled': False,
|
|
'webhook_enabled': False,
|
|
'severity': 'info',
|
|
'filter_conditions': None,
|
|
'config_id': None
|
|
},
|
|
{
|
|
'name': 'Certificate Expiry Warning',
|
|
'rule_type': 'cert_expiry',
|
|
'enabled': True,
|
|
'threshold': 30, # Alert when certs expire in 30 days
|
|
'email_enabled': False,
|
|
'webhook_enabled': False,
|
|
'severity': 'warning',
|
|
'filter_conditions': None,
|
|
'config_id': None
|
|
},
|
|
{
|
|
'name': 'Weak TLS Detection',
|
|
'rule_type': 'weak_tls',
|
|
'enabled': True,
|
|
'threshold': None,
|
|
'email_enabled': False,
|
|
'webhook_enabled': False,
|
|
'severity': 'warning',
|
|
'filter_conditions': None,
|
|
'config_id': None
|
|
},
|
|
{
|
|
'name': 'Host Down Detection',
|
|
'rule_type': 'ping_failed',
|
|
'enabled': True,
|
|
'threshold': None,
|
|
'email_enabled': False,
|
|
'webhook_enabled': False,
|
|
'severity': 'critical',
|
|
'filter_conditions': None,
|
|
'config_id': None
|
|
}
|
|
]
|
|
|
|
try:
|
|
for rule_data in default_rules:
|
|
rule = AlertRule(
|
|
name=rule_data['name'],
|
|
rule_type=rule_data['rule_type'],
|
|
enabled=rule_data['enabled'],
|
|
threshold=rule_data['threshold'],
|
|
email_enabled=rule_data['email_enabled'],
|
|
webhook_enabled=rule_data['webhook_enabled'],
|
|
severity=rule_data['severity'],
|
|
filter_conditions=rule_data['filter_conditions'],
|
|
config_id=rule_data['config_id'],
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
session.add(rule)
|
|
print(f" ✓ Created rule: {rule.name}")
|
|
|
|
session.commit()
|
|
print(f"✓ Created {len(default_rules)} default alert rules")
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to create default alert rules: {e}")
|
|
session.rollback()
|
|
raise
|
|
|
|
|
|
def init_database(db_url: str = "sqlite:///./sneakyscanner.db", run_migrations: bool = True):
|
|
"""
|
|
Initialize the database schema and settings.
|
|
|
|
Args:
|
|
db_url: Database URL (defaults to SQLite in current directory)
|
|
run_migrations: Whether to run Alembic migrations (True) or create all tables directly (False)
|
|
"""
|
|
print(f"Initializing SneakyScanner database at: {db_url}")
|
|
|
|
# Create database directory if it doesn't exist (for SQLite)
|
|
if db_url.startswith('sqlite:///'):
|
|
db_path = db_url.replace('sqlite:///', '')
|
|
db_dir = Path(db_path).parent
|
|
if not db_dir.exists():
|
|
print(f"Creating database directory: {db_dir}")
|
|
db_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if run_migrations:
|
|
# Run Alembic migrations
|
|
print("Running Alembic migrations...")
|
|
alembic_cfg = Config("alembic.ini")
|
|
alembic_cfg.set_main_option("sqlalchemy.url", db_url)
|
|
|
|
try:
|
|
# Upgrade to head (latest migration)
|
|
command.upgrade(alembic_cfg, "head")
|
|
print("✓ Database schema created successfully via migrations")
|
|
except Exception as e:
|
|
print(f"✗ Migration failed: {e}")
|
|
print("Falling back to direct table creation...")
|
|
run_migrations = False
|
|
|
|
if not run_migrations:
|
|
# Create tables directly using SQLAlchemy (fallback or if migrations disabled)
|
|
print("Creating database schema directly...")
|
|
engine = create_engine(db_url, echo=False)
|
|
Base.metadata.create_all(engine)
|
|
print("✓ Database schema created successfully")
|
|
|
|
# Initialize settings
|
|
print("\nInitializing default settings...")
|
|
engine = create_engine(db_url, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
settings_manager = SettingsManager(session)
|
|
settings_manager.init_defaults()
|
|
print("✓ Default settings initialized")
|
|
|
|
# Initialize default alert rules
|
|
init_default_alert_rules(session)
|
|
|
|
except Exception as e:
|
|
print(f"✗ Failed to initialize settings: {e}")
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
print("\n✓ Database initialization complete!")
|
|
return True
|
|
|
|
|
|
def set_password(db_url: str, password: str):
|
|
"""
|
|
Set the application password.
|
|
|
|
Args:
|
|
db_url: Database URL
|
|
password: Password to set
|
|
"""
|
|
print("Setting application password...")
|
|
|
|
engine = create_engine(db_url, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
settings_manager = SettingsManager(session)
|
|
PasswordManager.set_app_password(settings_manager, password)
|
|
print("✓ Password set successfully")
|
|
except Exception as e:
|
|
print(f"✗ Failed to set password: {e}")
|
|
session.rollback()
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def verify_database(db_url: str):
|
|
"""
|
|
Verify database schema and settings.
|
|
|
|
Args:
|
|
db_url: Database URL
|
|
"""
|
|
print("\nVerifying database...")
|
|
|
|
engine = create_engine(db_url, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
# Check if tables exist by querying settings
|
|
from web.models import Setting
|
|
count = session.query(Setting).count()
|
|
print(f"✓ Settings table accessible ({count} settings found)")
|
|
|
|
# Display current settings (sanitized)
|
|
settings_manager = SettingsManager(session)
|
|
settings = settings_manager.get_all(decrypt=False, sanitize=True)
|
|
print("\nCurrent settings:")
|
|
for key, value in sorted(settings.items()):
|
|
print(f" {key}: {value}")
|
|
|
|
except Exception as e:
|
|
print(f"✗ Database verification failed: {e}")
|
|
raise
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def main():
|
|
"""Main entry point for database initialization."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Initialize SneakyScanner database",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Initialize database with default settings
|
|
python3 init_db.py
|
|
|
|
# Initialize and set password
|
|
python3 init_db.py --password mysecretpassword
|
|
|
|
# Use custom database URL
|
|
python3 init_db.py --db-url postgresql://user:pass@localhost/sneakyscanner
|
|
|
|
# Force initialization without prompting (for Docker/scripts)
|
|
python3 init_db.py --force --password mysecret
|
|
|
|
# Verify existing database
|
|
python3 init_db.py --verify-only
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--db-url',
|
|
default='sqlite:///./sneakyscanner.db',
|
|
help='Database URL (default: sqlite:///./sneakyscanner.db)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--password',
|
|
help='Set application password'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--verify-only',
|
|
action='store_true',
|
|
help='Only verify database, do not initialize'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--no-migrations',
|
|
action='store_true',
|
|
help='Create tables directly instead of using migrations'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--force',
|
|
action='store_true',
|
|
help='Force initialization without prompting (for non-interactive environments)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check if database already exists
|
|
db_exists = False
|
|
if args.db_url.startswith('sqlite:///'):
|
|
db_path = args.db_url.replace('sqlite:///', '')
|
|
db_exists = Path(db_path).exists()
|
|
|
|
if db_exists and not args.verify_only and not args.force:
|
|
response = input(f"\nDatabase already exists at {db_path}. Reinitialize? (y/N): ")
|
|
if response.lower() != 'y':
|
|
print("Aborting.")
|
|
return
|
|
|
|
try:
|
|
if args.verify_only:
|
|
verify_database(args.db_url)
|
|
else:
|
|
# Initialize database
|
|
init_database(args.db_url, run_migrations=not args.no_migrations)
|
|
|
|
# Set password if provided
|
|
if args.password:
|
|
set_password(args.db_url, args.password)
|
|
|
|
# Verify
|
|
verify_database(args.db_url)
|
|
|
|
print("\n✓ All done! Database is ready to use.")
|
|
|
|
if not args.password and not args.verify_only:
|
|
print("\n⚠ WARNING: No password set. Run with --password to set one:")
|
|
print(f" python3 init_db.py --db-url {args.db_url} --password YOUR_PASSWORD")
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Initialization failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|