324 lines
9.8 KiB
Python
324 lines
9.8 KiB
Python
"""
|
|
Settings management system for SneakyScanner.
|
|
|
|
Provides secure storage and retrieval of application settings with encryption
|
|
for sensitive values like passwords and API tokens.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import bcrypt
|
|
from cryptography.fernet import Fernet
|
|
from sqlalchemy.orm import Session
|
|
|
|
from web.models import Setting
|
|
|
|
|
|
class SettingsManager:
|
|
"""
|
|
Manages application settings with encryption support.
|
|
|
|
Handles CRUD operations for settings stored in the database, with automatic
|
|
encryption/decryption for sensitive values.
|
|
"""
|
|
|
|
# Keys that should be encrypted when stored
|
|
ENCRYPTED_KEYS = {
|
|
'smtp_password',
|
|
'api_token',
|
|
'encryption_key',
|
|
}
|
|
|
|
def __init__(self, db_session: Session, encryption_key: Optional[bytes] = None):
|
|
"""
|
|
Initialize the settings manager.
|
|
|
|
Args:
|
|
db_session: SQLAlchemy database session
|
|
encryption_key: Fernet encryption key (32 url-safe base64-encoded bytes)
|
|
If not provided, will generate or load from environment
|
|
"""
|
|
self.db = db_session
|
|
self._encryption_key = encryption_key or self._get_or_create_encryption_key()
|
|
self._cipher = Fernet(self._encryption_key)
|
|
|
|
def _get_or_create_encryption_key(self) -> bytes:
|
|
"""
|
|
Get encryption key from environment or generate new one.
|
|
|
|
Returns:
|
|
Fernet encryption key (32 url-safe base64-encoded bytes)
|
|
"""
|
|
# Try to get from environment variable
|
|
key_str = os.environ.get('SNEAKYSCANNER_ENCRYPTION_KEY')
|
|
if key_str:
|
|
return key_str.encode()
|
|
|
|
# Try to get from settings table (for persistence)
|
|
existing_key = self.get('encryption_key', decrypt=False)
|
|
if existing_key:
|
|
return existing_key.encode()
|
|
|
|
# Generate new key if none exists
|
|
new_key = Fernet.generate_key()
|
|
# Store it in settings (unencrypted, as it's the key itself)
|
|
self._store_raw('encryption_key', new_key.decode())
|
|
return new_key
|
|
|
|
def _store_raw(self, key: str, value: str) -> None:
|
|
"""Store a setting without encryption (internal use only)."""
|
|
setting = self.db.query(Setting).filter_by(key=key).first()
|
|
if setting:
|
|
setting.value = value
|
|
setting.updated_at = datetime.utcnow()
|
|
else:
|
|
setting = Setting(key=key, value=value)
|
|
self.db.add(setting)
|
|
self.db.commit()
|
|
|
|
def _should_encrypt(self, key: str) -> bool:
|
|
"""Check if a setting key should be encrypted."""
|
|
return key in self.ENCRYPTED_KEYS
|
|
|
|
def _encrypt(self, value: str) -> str:
|
|
"""Encrypt a string value."""
|
|
return self._cipher.encrypt(value.encode()).decode()
|
|
|
|
def _decrypt(self, encrypted_value: str) -> str:
|
|
"""Decrypt an encrypted value."""
|
|
return self._cipher.decrypt(encrypted_value.encode()).decode()
|
|
|
|
def get(self, key: str, default: Any = None, decrypt: bool = True) -> Any:
|
|
"""
|
|
Get a setting value by key.
|
|
|
|
Args:
|
|
key: Setting key to retrieve
|
|
default: Default value if key not found
|
|
decrypt: Whether to decrypt if value is encrypted
|
|
|
|
Returns:
|
|
Setting value (automatically decrypts if needed and decrypt=True)
|
|
"""
|
|
setting = self.db.query(Setting).filter_by(key=key).first()
|
|
if not setting:
|
|
return default
|
|
|
|
value = setting.value
|
|
if value is None:
|
|
return default
|
|
|
|
# Decrypt if needed
|
|
if decrypt and self._should_encrypt(key):
|
|
try:
|
|
value = self._decrypt(value)
|
|
except Exception:
|
|
# If decryption fails, return as-is (might be legacy unencrypted value)
|
|
pass
|
|
|
|
# Try to parse JSON for complex types
|
|
if value.startswith('[') or value.startswith('{'):
|
|
try:
|
|
return json.loads(value)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return value
|
|
|
|
def set(self, key: str, value: Any, encrypt: bool = None) -> None:
|
|
"""
|
|
Set a setting value.
|
|
|
|
Args:
|
|
key: Setting key
|
|
value: Setting value (will be JSON-encoded if dict/list)
|
|
encrypt: Force encryption on/off (None = auto-detect from ENCRYPTED_KEYS)
|
|
"""
|
|
# Convert complex types to JSON
|
|
if isinstance(value, (dict, list)):
|
|
value_str = json.dumps(value)
|
|
else:
|
|
value_str = str(value)
|
|
|
|
# Determine if we should encrypt
|
|
should_encrypt = encrypt if encrypt is not None else self._should_encrypt(key)
|
|
|
|
if should_encrypt:
|
|
value_str = self._encrypt(value_str)
|
|
|
|
# Store in database
|
|
setting = self.db.query(Setting).filter_by(key=key).first()
|
|
if setting:
|
|
setting.value = value_str
|
|
setting.updated_at = datetime.utcnow()
|
|
else:
|
|
setting = Setting(key=key, value=value_str)
|
|
self.db.add(setting)
|
|
|
|
self.db.commit()
|
|
|
|
def delete(self, key: str) -> bool:
|
|
"""
|
|
Delete a setting.
|
|
|
|
Args:
|
|
key: Setting key to delete
|
|
|
|
Returns:
|
|
True if deleted, False if key not found
|
|
"""
|
|
setting = self.db.query(Setting).filter_by(key=key).first()
|
|
if setting:
|
|
self.db.delete(setting)
|
|
self.db.commit()
|
|
return True
|
|
return False
|
|
|
|
def get_all(self, decrypt: bool = False, sanitize: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Get all settings as a dictionary.
|
|
|
|
Args:
|
|
decrypt: Whether to decrypt encrypted values
|
|
sanitize: If True, replaces encrypted values with '***' for security
|
|
|
|
Returns:
|
|
Dictionary of all settings
|
|
"""
|
|
settings = self.db.query(Setting).all()
|
|
result = {}
|
|
|
|
for setting in settings:
|
|
key = setting.key
|
|
value = setting.value
|
|
|
|
if value is None:
|
|
result[key] = None
|
|
continue
|
|
|
|
# Handle sanitization for sensitive keys
|
|
if sanitize and self._should_encrypt(key):
|
|
result[key] = '***ENCRYPTED***'
|
|
continue
|
|
|
|
# Decrypt if requested
|
|
if decrypt and self._should_encrypt(key):
|
|
try:
|
|
value = self._decrypt(value)
|
|
except Exception:
|
|
pass
|
|
|
|
# Try to parse JSON
|
|
if value and (value.startswith('[') or value.startswith('{')):
|
|
try:
|
|
value = json.loads(value)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
result[key] = value
|
|
|
|
return result
|
|
|
|
def init_defaults(self) -> None:
|
|
"""
|
|
Initialize default settings if they don't exist.
|
|
|
|
This should be called on first app startup to populate default values.
|
|
"""
|
|
defaults = {
|
|
# SMTP settings
|
|
'smtp_server': 'localhost',
|
|
'smtp_port': 587,
|
|
'smtp_username': '',
|
|
'smtp_password': '',
|
|
'smtp_from_email': 'noreply@sneakyscanner.local',
|
|
'smtp_to_emails': [],
|
|
|
|
# Authentication
|
|
'app_password': '', # Will need to be set by user
|
|
|
|
# Retention policy
|
|
'retention_days': 0, # 0 = keep forever
|
|
|
|
# Alert settings
|
|
'cert_expiry_threshold': 30, # Days before expiry to alert
|
|
'email_alerts_enabled': False,
|
|
}
|
|
|
|
for key, value in defaults.items():
|
|
# Only set if doesn't exist
|
|
if self.db.query(Setting).filter_by(key=key).first() is None:
|
|
self.set(key, value)
|
|
|
|
|
|
class PasswordManager:
|
|
"""
|
|
Manages password hashing and verification using bcrypt.
|
|
|
|
Used for the single-user authentication system.
|
|
"""
|
|
|
|
@staticmethod
|
|
def hash_password(password: str) -> str:
|
|
"""
|
|
Hash a password using bcrypt.
|
|
|
|
Args:
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
Bcrypt hash string
|
|
"""
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
@staticmethod
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
"""
|
|
Verify a password against a bcrypt hash.
|
|
|
|
Args:
|
|
password: Plain text password to verify
|
|
hashed: Bcrypt hash to check against
|
|
|
|
Returns:
|
|
True if password matches, False otherwise
|
|
"""
|
|
try:
|
|
return bcrypt.checkpw(password.encode(), hashed.encode())
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def set_app_password(settings_manager: SettingsManager, password: str) -> None:
|
|
"""
|
|
Set the application password (stored as bcrypt hash).
|
|
|
|
Args:
|
|
settings_manager: SettingsManager instance
|
|
password: New password to set
|
|
"""
|
|
hashed = PasswordManager.hash_password(password)
|
|
# Password hash stored as regular setting (not encrypted, as it's already a hash)
|
|
settings_manager.set('app_password', hashed, encrypt=False)
|
|
|
|
@staticmethod
|
|
def verify_app_password(settings_manager: SettingsManager, password: str) -> bool:
|
|
"""
|
|
Verify the application password.
|
|
|
|
Args:
|
|
settings_manager: SettingsManager instance
|
|
password: Password to verify
|
|
|
|
Returns:
|
|
True if password matches, False otherwise
|
|
"""
|
|
stored_hash = settings_manager.get('app_password', decrypt=False)
|
|
if not stored_hash:
|
|
# No password set - should prompt user to create one
|
|
return False
|
|
return PasswordManager.verify_password(password, stored_hash)
|