Complete Phase 1: Foundation - Flask web application infrastructure

Implement complete database schema and Flask application structure for
SneakyScan web interface. This establishes the foundation for web-based
scan management, scheduling, and visualization.

Database & ORM:
- Add 11 SQLAlchemy models for comprehensive scan data storage
  (Scan, ScanSite, ScanIP, ScanPort, ScanService, ScanCertificate,
  ScanTLSVersion, Schedule, Alert, AlertRule, Setting)
- Configure Alembic migrations system with initial schema migration
- Add init_db.py script for database initialization and password setup
- Support both migration-based and direct table creation

Settings System:
- Implement SettingsManager with automatic encryption for sensitive values
- Add Fernet encryption for SMTP passwords and API tokens
- Implement PasswordManager with bcrypt password hashing (work factor 12)
- Initialize default settings for SMTP, authentication, and retention

Flask Application:
- Create Flask app factory pattern with scoped session management
- Add 4 API blueprints: scans, schedules, alerts, settings
- Implement functional Settings API (GET/PUT/DELETE endpoints)
- Add CORS support, error handlers, and request/response logging
- Configure development and production logging to file and console

Docker & Deployment:
- Update Dockerfile to install Flask dependencies
- Add docker-compose-web.yml for web application deployment
- Configure volume mounts for database, output, and logs persistence
- Expose port 5000 for Flask web server

Testing & Validation:
- Add validate_phase1.py script to verify all deliverables
- Validate directory structure, Python syntax, models, and endpoints
- All validation checks passing

Documentation:
- Add PHASE1_COMPLETE.md with comprehensive Phase 1 summary
- Update ROADMAP.md with Phase 1 completion status
- Update .gitignore to exclude database files and documentation

Files changed: 21 files
- New: web/ directory with complete Flask app structure
- New: migrations/ with Alembic configuration
- New: requirements-web.txt with Flask dependencies
- Modified: Dockerfile, ROADMAP.md, .gitignore
This commit is contained in:
2025-11-13 23:59:23 -06:00
parent e29c839d80
commit 986c0d3d17
22 changed files with 3138 additions and 42 deletions

0
web/__init__.py Normal file
View File

0
web/api/__init__.py Normal file
View File

137
web/api/alerts.py Normal file
View File

@@ -0,0 +1,137 @@
"""
Alerts API blueprint.
Handles endpoints for viewing alert history and managing alert rules.
"""
from flask import Blueprint, jsonify, request
bp = Blueprint('alerts', __name__)
@bp.route('', methods=['GET'])
def list_alerts():
"""
List recent alerts.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 20)
alert_type: Filter by alert type
severity: Filter by severity (info, warning, critical)
start_date: Filter alerts after this date
end_date: Filter alerts before this date
Returns:
JSON response with alerts list
"""
# TODO: Implement in Phase 4
return jsonify({
'alerts': [],
'total': 0,
'page': 1,
'per_page': 20,
'message': 'Alerts list endpoint - to be implemented in Phase 4'
})
@bp.route('/rules', methods=['GET'])
def list_alert_rules():
"""
List all alert rules.
Returns:
JSON response with alert rules
"""
# TODO: Implement in Phase 4
return jsonify({
'rules': [],
'message': 'Alert rules list endpoint - to be implemented in Phase 4'
})
@bp.route('/rules', methods=['POST'])
def create_alert_rule():
"""
Create a new alert rule.
Request body:
rule_type: Type of alert rule
threshold: Threshold value (e.g., days for cert expiry)
enabled: Whether rule is active (default: true)
email_enabled: Send email for this rule (default: false)
Returns:
JSON response with created rule ID
"""
# TODO: Implement in Phase 4
data = request.get_json() or {}
return jsonify({
'rule_id': None,
'status': 'not_implemented',
'message': 'Alert rule creation endpoint - to be implemented in Phase 4',
'data': data
}), 501
@bp.route('/rules/<int:rule_id>', methods=['PUT'])
def update_alert_rule(rule_id):
"""
Update an existing alert rule.
Args:
rule_id: Alert rule ID to update
Request body:
threshold: Threshold value (optional)
enabled: Whether rule is active (optional)
email_enabled: Send email for this rule (optional)
Returns:
JSON response with update status
"""
# TODO: Implement in Phase 4
data = request.get_json() or {}
return jsonify({
'rule_id': rule_id,
'status': 'not_implemented',
'message': 'Alert rule update endpoint - to be implemented in Phase 4',
'data': data
}), 501
@bp.route('/rules/<int:rule_id>', methods=['DELETE'])
def delete_alert_rule(rule_id):
"""
Delete an alert rule.
Args:
rule_id: Alert rule ID to delete
Returns:
JSON response with deletion status
"""
# TODO: Implement in Phase 4
return jsonify({
'rule_id': rule_id,
'status': 'not_implemented',
'message': 'Alert rule deletion endpoint - to be implemented in Phase 4'
}), 501
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'alerts',
'version': '1.0.0-phase1'
})

150
web/api/scans.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Scans API blueprint.
Handles endpoints for triggering scans, listing scan history, and retrieving
scan results.
"""
from flask import Blueprint, current_app, jsonify, request
bp = Blueprint('scans', __name__)
@bp.route('', methods=['GET'])
def list_scans():
"""
List all scans with pagination.
Query params:
page: Page number (default: 1)
per_page: Items per page (default: 20, max: 100)
status: Filter by status (running, completed, failed)
Returns:
JSON response with scans list and pagination info
"""
# TODO: Implement in Phase 2
return jsonify({
'scans': [],
'total': 0,
'page': 1,
'per_page': 20,
'message': 'Scans endpoint - to be implemented in Phase 2'
})
@bp.route('/<int:scan_id>', methods=['GET'])
def get_scan(scan_id):
"""
Get details for a specific scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan details
"""
# TODO: Implement in Phase 2
return jsonify({
'scan_id': scan_id,
'message': 'Scan detail endpoint - to be implemented in Phase 2'
})
@bp.route('', methods=['POST'])
def trigger_scan():
"""
Trigger a new scan.
Request body:
config_file: Path to YAML config file
Returns:
JSON response with scan_id and status
"""
# TODO: Implement in Phase 2
data = request.get_json() or {}
config_file = data.get('config_file')
return jsonify({
'scan_id': None,
'status': 'not_implemented',
'message': 'Scan trigger endpoint - to be implemented in Phase 2',
'config_file': config_file
}), 501 # Not Implemented
@bp.route('/<int:scan_id>', methods=['DELETE'])
def delete_scan(scan_id):
"""
Delete a scan and its associated files.
Args:
scan_id: Scan ID to delete
Returns:
JSON response with deletion status
"""
# TODO: Implement in Phase 2
return jsonify({
'scan_id': scan_id,
'status': 'not_implemented',
'message': 'Scan deletion endpoint - to be implemented in Phase 2'
}), 501
@bp.route('/<int:scan_id>/status', methods=['GET'])
def get_scan_status(scan_id):
"""
Get current status of a running scan.
Args:
scan_id: Scan ID
Returns:
JSON response with scan status and progress
"""
# TODO: Implement in Phase 2
return jsonify({
'scan_id': scan_id,
'status': 'not_implemented',
'progress': '0%',
'message': 'Scan status endpoint - to be implemented in Phase 2'
})
@bp.route('/<int:scan_id1>/compare/<int:scan_id2>', methods=['GET'])
def compare_scans(scan_id1, scan_id2):
"""
Compare two scans and show differences.
Args:
scan_id1: First scan ID
scan_id2: Second scan ID
Returns:
JSON response with comparison results
"""
# TODO: Implement in Phase 4
return jsonify({
'scan_id1': scan_id1,
'scan_id2': scan_id2,
'diff': {},
'message': 'Scan comparison endpoint - to be implemented in Phase 4'
})
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'scans',
'version': '1.0.0-phase1'
})

150
web/api/schedules.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Schedules API blueprint.
Handles endpoints for managing scheduled scans including CRUD operations
and manual triggering.
"""
from flask import Blueprint, jsonify, request
bp = Blueprint('schedules', __name__)
@bp.route('', methods=['GET'])
def list_schedules():
"""
List all schedules.
Returns:
JSON response with schedules list
"""
# TODO: Implement in Phase 3
return jsonify({
'schedules': [],
'message': 'Schedules list endpoint - to be implemented in Phase 3'
})
@bp.route('/<int:schedule_id>', methods=['GET'])
def get_schedule(schedule_id):
"""
Get details for a specific schedule.
Args:
schedule_id: Schedule ID
Returns:
JSON response with schedule details
"""
# TODO: Implement in Phase 3
return jsonify({
'schedule_id': schedule_id,
'message': 'Schedule detail endpoint - to be implemented in Phase 3'
})
@bp.route('', methods=['POST'])
def create_schedule():
"""
Create a new schedule.
Request body:
name: Schedule name
config_file: Path to YAML config
cron_expression: Cron-like schedule expression
Returns:
JSON response with created schedule ID
"""
# TODO: Implement in Phase 3
data = request.get_json() or {}
return jsonify({
'schedule_id': None,
'status': 'not_implemented',
'message': 'Schedule creation endpoint - to be implemented in Phase 3',
'data': data
}), 501
@bp.route('/<int:schedule_id>', methods=['PUT'])
def update_schedule(schedule_id):
"""
Update an existing schedule.
Args:
schedule_id: Schedule ID to update
Request body:
name: Schedule name (optional)
config_file: Path to YAML config (optional)
cron_expression: Cron-like schedule expression (optional)
enabled: Whether schedule is active (optional)
Returns:
JSON response with update status
"""
# TODO: Implement in Phase 3
data = request.get_json() or {}
return jsonify({
'schedule_id': schedule_id,
'status': 'not_implemented',
'message': 'Schedule update endpoint - to be implemented in Phase 3',
'data': data
}), 501
@bp.route('/<int:schedule_id>', methods=['DELETE'])
def delete_schedule(schedule_id):
"""
Delete a schedule.
Args:
schedule_id: Schedule ID to delete
Returns:
JSON response with deletion status
"""
# TODO: Implement in Phase 3
return jsonify({
'schedule_id': schedule_id,
'status': 'not_implemented',
'message': 'Schedule deletion endpoint - to be implemented in Phase 3'
}), 501
@bp.route('/<int:schedule_id>/trigger', methods=['POST'])
def trigger_schedule(schedule_id):
"""
Manually trigger a scheduled scan.
Args:
schedule_id: Schedule ID to trigger
Returns:
JSON response with triggered scan ID
"""
# TODO: Implement in Phase 3
return jsonify({
'schedule_id': schedule_id,
'scan_id': None,
'status': 'not_implemented',
'message': 'Manual schedule trigger endpoint - to be implemented in Phase 3'
}), 501
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'schedules',
'version': '1.0.0-phase1'
})

267
web/api/settings.py Normal file
View File

@@ -0,0 +1,267 @@
"""
Settings API blueprint.
Handles endpoints for managing application settings including SMTP configuration,
authentication, and system preferences.
"""
from flask import Blueprint, current_app, jsonify, request
from web.utils.settings import PasswordManager, SettingsManager
bp = Blueprint('settings', __name__)
def get_settings_manager():
"""Get SettingsManager instance with current DB session."""
return SettingsManager(current_app.db_session)
@bp.route('', methods=['GET'])
def get_settings():
"""
Get all settings (sanitized - encrypted values masked).
Returns:
JSON response with all settings
"""
try:
settings_manager = get_settings_manager()
settings = settings_manager.get_all(decrypt=False, sanitize=True)
return jsonify({
'status': 'success',
'settings': settings
})
except Exception as e:
current_app.logger.error(f"Failed to retrieve settings: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to retrieve settings'
}), 500
@bp.route('', methods=['PUT'])
def update_settings():
"""
Update multiple settings at once.
Request body:
settings: Dictionary of setting key-value pairs
Returns:
JSON response with update status
"""
# TODO: Add authentication in Phase 2
data = request.get_json() or {}
settings_dict = data.get('settings', {})
if not settings_dict:
return jsonify({
'status': 'error',
'message': 'No settings provided'
}), 400
try:
settings_manager = get_settings_manager()
# Update each setting
for key, value in settings_dict.items():
settings_manager.set(key, value)
return jsonify({
'status': 'success',
'message': f'Updated {len(settings_dict)} settings'
})
except Exception as e:
current_app.logger.error(f"Failed to update settings: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to update settings'
}), 500
@bp.route('/<string:key>', methods=['GET'])
def get_setting(key):
"""
Get a specific setting by key.
Args:
key: Setting key
Returns:
JSON response with setting value
"""
try:
settings_manager = get_settings_manager()
value = settings_manager.get(key)
if value is None:
return jsonify({
'status': 'error',
'message': f'Setting "{key}" not found'
}), 404
# Sanitize if encrypted key
if settings_manager._should_encrypt(key):
value = '***ENCRYPTED***'
return jsonify({
'status': 'success',
'key': key,
'value': value
})
except Exception as e:
current_app.logger.error(f"Failed to retrieve setting {key}: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to retrieve setting'
}), 500
@bp.route('/<string:key>', methods=['PUT'])
def update_setting(key):
"""
Update a specific setting.
Args:
key: Setting key
Request body:
value: New value for the setting
Returns:
JSON response with update status
"""
# TODO: Add authentication in Phase 2
data = request.get_json() or {}
value = data.get('value')
if value is None:
return jsonify({
'status': 'error',
'message': 'No value provided'
}), 400
try:
settings_manager = get_settings_manager()
settings_manager.set(key, value)
return jsonify({
'status': 'success',
'message': f'Setting "{key}" updated'
})
except Exception as e:
current_app.logger.error(f"Failed to update setting {key}: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to update setting'
}), 500
@bp.route('/<string:key>', methods=['DELETE'])
def delete_setting(key):
"""
Delete a setting.
Args:
key: Setting key to delete
Returns:
JSON response with deletion status
"""
# TODO: Add authentication in Phase 2
try:
settings_manager = get_settings_manager()
deleted = settings_manager.delete(key)
if not deleted:
return jsonify({
'status': 'error',
'message': f'Setting "{key}" not found'
}), 404
return jsonify({
'status': 'success',
'message': f'Setting "{key}" deleted'
})
except Exception as e:
current_app.logger.error(f"Failed to delete setting {key}: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to delete setting'
}), 500
@bp.route('/password', methods=['POST'])
def set_password():
"""
Set the application password.
Request body:
password: New password
Returns:
JSON response with status
"""
# TODO: Add current password verification in Phase 2
data = request.get_json() or {}
password = data.get('password')
if not password:
return jsonify({
'status': 'error',
'message': 'No password provided'
}), 400
if len(password) < 8:
return jsonify({
'status': 'error',
'message': 'Password must be at least 8 characters'
}), 400
try:
settings_manager = get_settings_manager()
PasswordManager.set_app_password(settings_manager, password)
return jsonify({
'status': 'success',
'message': 'Password updated successfully'
})
except Exception as e:
current_app.logger.error(f"Failed to set password: {e}")
return jsonify({
'status': 'error',
'message': 'Failed to set password'
}), 500
@bp.route('/test-email', methods=['POST'])
def test_email():
"""
Test email configuration by sending a test email.
Returns:
JSON response with test result
"""
# TODO: Implement in Phase 4 (email support)
return jsonify({
'status': 'not_implemented',
'message': 'Email testing endpoint - to be implemented in Phase 4'
}), 501
# Health check endpoint
@bp.route('/health', methods=['GET'])
def health_check():
"""
Health check endpoint for monitoring.
Returns:
JSON response with API health status
"""
return jsonify({
'status': 'healthy',
'api': 'settings',
'version': '1.0.0-phase1'
})

292
web/app.py Normal file
View File

@@ -0,0 +1,292 @@
"""
Flask application factory for SneakyScanner web interface.
This module creates and configures the Flask application with all necessary
extensions, blueprints, and middleware.
"""
import logging
import os
from pathlib import Path
from flask import Flask, jsonify
from flask_cors import CORS
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from web.models import Base
def create_app(config: dict = None) -> Flask:
"""
Create and configure the Flask application.
Args:
config: Optional configuration dictionary to override defaults
Returns:
Configured Flask application instance
"""
app = Flask(__name__,
instance_relative_config=True,
static_folder='static',
template_folder='templates')
# Load default configuration
app.config.from_mapping(
SECRET_KEY=os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production'),
SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URL', 'sqlite:///./sneakyscanner.db'),
SQLALCHEMY_TRACK_MODIFICATIONS=False,
JSON_SORT_KEYS=False, # Preserve order in JSON responses
MAX_CONTENT_LENGTH=50 * 1024 * 1024, # 50MB max upload size
)
# Override with custom config if provided
if config:
app.config.update(config)
# Ensure instance folder exists
try:
os.makedirs(app.instance_path, exist_ok=True)
except OSError:
pass
# Configure logging
configure_logging(app)
# Initialize database
init_database(app)
# Initialize extensions
init_extensions(app)
# Register blueprints
register_blueprints(app)
# Register error handlers
register_error_handlers(app)
# Add request/response handlers
register_request_handlers(app)
app.logger.info("SneakyScanner Flask app initialized")
return app
def configure_logging(app: Flask) -> None:
"""
Configure application logging.
Args:
app: Flask application instance
"""
# Set log level from environment or default to INFO
log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
app.logger.setLevel(getattr(logging, log_level, logging.INFO))
# Create logs directory if it doesn't exist
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
# File handler for all logs
file_handler = logging.FileHandler(log_dir / 'sneakyscanner.log')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
app.logger.addHandler(file_handler)
# Console handler for development
if app.debug:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
'[%(levelname)s] %(name)s: %(message)s'
)
console_handler.setFormatter(console_formatter)
app.logger.addHandler(console_handler)
def init_database(app: Flask) -> None:
"""
Initialize database connection and session management.
Args:
app: Flask application instance
"""
# Create engine
engine = create_engine(
app.config['SQLALCHEMY_DATABASE_URI'],
echo=app.debug, # Log SQL in debug mode
pool_pre_ping=True, # Verify connections before using
pool_recycle=3600, # Recycle connections after 1 hour
)
# Create scoped session factory
db_session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
)
# Store session in app for use in views
app.db_session = db_session
# Create tables if they don't exist (for development)
# In production, use Alembic migrations instead
if app.debug:
Base.metadata.create_all(bind=engine)
@app.teardown_appcontext
def shutdown_session(exception=None):
"""Remove database session at end of request."""
db_session.remove()
app.logger.info(f"Database initialized: {app.config['SQLALCHEMY_DATABASE_URI']}")
def init_extensions(app: Flask) -> None:
"""
Initialize Flask extensions.
Args:
app: Flask application instance
"""
# CORS support for API
CORS(app, resources={
r"/api/*": {
"origins": os.environ.get('CORS_ORIGINS', '*').split(','),
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"],
}
})
app.logger.info("Extensions initialized")
def register_blueprints(app: Flask) -> None:
"""
Register Flask blueprints for different app sections.
Args:
app: Flask application instance
"""
# Import blueprints
from web.api.scans import bp as scans_bp
from web.api.schedules import bp as schedules_bp
from web.api.alerts import bp as alerts_bp
from web.api.settings import bp as settings_bp
# Register API blueprints
app.register_blueprint(scans_bp, url_prefix='/api/scans')
app.register_blueprint(schedules_bp, url_prefix='/api/schedules')
app.register_blueprint(alerts_bp, url_prefix='/api/alerts')
app.register_blueprint(settings_bp, url_prefix='/api/settings')
app.logger.info("Blueprints registered")
def register_error_handlers(app: Flask) -> None:
"""
Register error handlers for common HTTP errors.
Args:
app: Flask application instance
"""
@app.errorhandler(400)
def bad_request(error):
return jsonify({
'error': 'Bad Request',
'message': str(error) or 'The request was invalid'
}), 400
@app.errorhandler(401)
def unauthorized(error):
return jsonify({
'error': 'Unauthorized',
'message': 'Authentication required'
}), 401
@app.errorhandler(403)
def forbidden(error):
return jsonify({
'error': 'Forbidden',
'message': 'You do not have permission to access this resource'
}), 403
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Not Found',
'message': 'The requested resource was not found'
}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({
'error': 'Method Not Allowed',
'message': 'The HTTP method is not allowed for this endpoint'
}), 405
@app.errorhandler(500)
def internal_server_error(error):
app.logger.error(f"Internal server error: {error}")
return jsonify({
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
}), 500
def register_request_handlers(app: Flask) -> None:
"""
Register request and response handlers.
Args:
app: Flask application instance
"""
@app.before_request
def log_request():
"""Log incoming requests."""
if app.debug:
app.logger.debug(f"{request.method} {request.path}")
@app.after_request
def add_security_headers(response):
"""Add security headers to all responses."""
# Only add CORS and security headers for API routes
if request.path.startswith('/api/'):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
# Import request at runtime to avoid circular imports
from flask import request
# Re-apply to ensure request is available
@app.before_request
def log_request():
"""Log incoming requests."""
if app.debug:
app.logger.debug(f"{request.method} {request.path}")
# Development server entry point
def main():
"""Run development server."""
app = create_app()
app.run(
host=os.environ.get('FLASK_HOST', '0.0.0.0'),
port=int(os.environ.get('FLASK_PORT', 5000)),
debug=os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
)
if __name__ == '__main__':
main()

345
web/models.py Normal file
View File

@@ -0,0 +1,345 @@
"""
SQLAlchemy models for SneakyScanner database.
This module defines all database tables for storing scan results, schedules,
alerts, and application settings. The schema supports the full scanning workflow
from port discovery through service detection and SSL/TLS analysis.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import DeclarativeBase, relationship
class Base(DeclarativeBase):
"""Base class for all SQLAlchemy models."""
pass
# ============================================================================
# Core Scan Tables
# ============================================================================
class Scan(Base):
"""
Stores metadata about each scan execution.
This is the parent table that ties together all scan results including
sites, IPs, ports, services, certificates, and TLS configuration.
"""
__tablename__ = 'scans'
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime, nullable=False, index=True, comment="Scan start time (UTC)")
duration = Column(Float, nullable=True, comment="Total scan duration in seconds")
status = Column(String(20), nullable=False, default='running', comment="running, completed, failed")
config_file = Column(Text, nullable=True, comment="Path to YAML config used")
title = Column(Text, nullable=True, comment="Scan title from config")
json_path = Column(Text, nullable=True, comment="Path to JSON report")
html_path = Column(Text, nullable=True, comment="Path to HTML report")
zip_path = Column(Text, nullable=True, comment="Path to ZIP archive")
screenshot_dir = Column(Text, nullable=True, comment="Path to screenshot directory")
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Record creation time")
triggered_by = Column(String(50), nullable=False, default='manual', comment="manual, scheduled, api")
schedule_id = Column(Integer, ForeignKey('schedules.id'), nullable=True, comment="FK to schedules if triggered by schedule")
# Relationships
sites = relationship('ScanSite', back_populates='scan', cascade='all, delete-orphan')
ips = relationship('ScanIP', back_populates='scan', cascade='all, delete-orphan')
ports = relationship('ScanPort', back_populates='scan', cascade='all, delete-orphan')
services = relationship('ScanService', back_populates='scan', cascade='all, delete-orphan')
certificates = relationship('ScanCertificate', back_populates='scan', cascade='all, delete-orphan')
tls_versions = relationship('ScanTLSVersion', back_populates='scan', cascade='all, delete-orphan')
alerts = relationship('Alert', back_populates='scan', cascade='all, delete-orphan')
schedule = relationship('Schedule', back_populates='scans')
def __repr__(self):
return f"<Scan(id={self.id}, title='{self.title}', status='{self.status}')>"
class ScanSite(Base):
"""
Logical grouping of IPs by site.
Sites represent logical network segments or locations (e.g., "Production DC",
"DMZ", "Branch Office") as defined in the scan configuration.
"""
__tablename__ = 'scan_sites'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
site_name = Column(String(255), nullable=False, comment="Site name from config")
# Relationships
scan = relationship('Scan', back_populates='sites')
ips = relationship('ScanIP', back_populates='site', cascade='all, delete-orphan')
def __repr__(self):
return f"<ScanSite(id={self.id}, site_name='{self.site_name}')>"
class ScanIP(Base):
"""
IP addresses scanned in each scan.
Stores the target IPs and their ping response status (expected vs. actual).
"""
__tablename__ = 'scan_ips'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
site_id = Column(Integer, ForeignKey('scan_sites.id'), nullable=False, index=True)
ip_address = Column(String(45), nullable=False, comment="IPv4 or IPv6 address")
ping_expected = Column(Boolean, nullable=True, comment="Expected ping response")
ping_actual = Column(Boolean, nullable=True, comment="Actual ping response")
# Relationships
scan = relationship('Scan', back_populates='ips')
site = relationship('ScanSite', back_populates='ips')
ports = relationship('ScanPort', back_populates='ip', cascade='all, delete-orphan')
# Index for efficient IP lookups within a scan
__table_args__ = (
UniqueConstraint('scan_id', 'ip_address', name='uix_scan_ip'),
)
def __repr__(self):
return f"<ScanIP(id={self.id}, ip_address='{self.ip_address}')>"
class ScanPort(Base):
"""
Discovered TCP/UDP ports.
Stores all open ports found during masscan phase, along with expected vs.
actual status for drift detection.
"""
__tablename__ = 'scan_ports'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
ip_id = Column(Integer, ForeignKey('scan_ips.id'), nullable=False, index=True)
port = Column(Integer, nullable=False, comment="Port number (1-65535)")
protocol = Column(String(10), nullable=False, comment="tcp or udp")
expected = Column(Boolean, nullable=True, comment="Was this port expected?")
state = Column(String(20), nullable=False, default='open', comment="open, closed, filtered")
# Relationships
scan = relationship('Scan', back_populates='ports')
ip = relationship('ScanIP', back_populates='ports')
services = relationship('ScanService', back_populates='port', cascade='all, delete-orphan')
# Index for efficient port lookups
__table_args__ = (
UniqueConstraint('scan_id', 'ip_id', 'port', 'protocol', name='uix_scan_ip_port'),
)
def __repr__(self):
return f"<ScanPort(id={self.id}, port={self.port}, protocol='{self.protocol}')>"
class ScanService(Base):
"""
Detected services on open ports.
Stores nmap service detection results including product names, versions,
and HTTP/HTTPS information with screenshots.
"""
__tablename__ = 'scan_services'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
port_id = Column(Integer, ForeignKey('scan_ports.id'), nullable=False, index=True)
service_name = Column(String(100), nullable=True, comment="Service name (e.g., ssh, http)")
product = Column(String(255), nullable=True, comment="Product name (e.g., OpenSSH)")
version = Column(String(100), nullable=True, comment="Version string")
extrainfo = Column(Text, nullable=True, comment="Additional nmap info")
ostype = Column(String(100), nullable=True, comment="OS type if detected")
http_protocol = Column(String(10), nullable=True, comment="http or https (if web service)")
screenshot_path = Column(Text, nullable=True, comment="Relative path to screenshot")
# Relationships
scan = relationship('Scan', back_populates='services')
port = relationship('ScanPort', back_populates='services')
certificates = relationship('ScanCertificate', back_populates='service', cascade='all, delete-orphan')
def __repr__(self):
return f"<ScanService(id={self.id}, service_name='{self.service_name}', product='{self.product}')>"
class ScanCertificate(Base):
"""
SSL/TLS certificates discovered on HTTPS services.
Stores certificate details including validity periods, subject/issuer,
and flags for self-signed certificates.
"""
__tablename__ = 'scan_certificates'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
service_id = Column(Integer, ForeignKey('scan_services.id'), nullable=False, index=True)
subject = Column(Text, nullable=True, comment="Certificate subject (CN)")
issuer = Column(Text, nullable=True, comment="Certificate issuer")
serial_number = Column(Text, nullable=True, comment="Serial number")
not_valid_before = Column(DateTime, nullable=True, comment="Validity start date")
not_valid_after = Column(DateTime, nullable=True, comment="Validity end date")
days_until_expiry = Column(Integer, nullable=True, comment="Days until expiration")
sans = Column(Text, nullable=True, comment="JSON array of SANs")
is_self_signed = Column(Boolean, nullable=True, default=False, comment="Self-signed certificate flag")
# Relationships
scan = relationship('Scan', back_populates='certificates')
service = relationship('ScanService', back_populates='certificates')
tls_versions = relationship('ScanTLSVersion', back_populates='certificate', cascade='all, delete-orphan')
# Index for certificate expiration queries
__table_args__ = (
{'comment': 'Index on expiration date for alert queries'},
)
def __repr__(self):
return f"<ScanCertificate(id={self.id}, subject='{self.subject}', days_until_expiry={self.days_until_expiry})>"
class ScanTLSVersion(Base):
"""
TLS version support and cipher suites.
Stores which TLS versions (1.0, 1.1, 1.2, 1.3) are supported by each
HTTPS service, along with accepted cipher suites.
"""
__tablename__ = 'scan_tls_versions'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
certificate_id = Column(Integer, ForeignKey('scan_certificates.id'), nullable=False, index=True)
tls_version = Column(String(20), nullable=False, comment="TLS 1.0, TLS 1.1, TLS 1.2, TLS 1.3")
supported = Column(Boolean, nullable=False, comment="Is this version supported?")
cipher_suites = Column(Text, nullable=True, comment="JSON array of cipher suites")
# Relationships
scan = relationship('Scan', back_populates='tls_versions')
certificate = relationship('ScanCertificate', back_populates='tls_versions')
def __repr__(self):
return f"<ScanTLSVersion(id={self.id}, tls_version='{self.tls_version}', supported={self.supported})>"
# ============================================================================
# Scheduling & Notifications Tables
# ============================================================================
class Schedule(Base):
"""
Scheduled scan configurations.
Stores cron-like schedules for automated periodic scanning of network
infrastructure.
"""
__tablename__ = 'schedules'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), nullable=False, comment="Schedule name (e.g., 'Daily prod scan')")
config_file = Column(Text, nullable=False, comment="Path to YAML config")
cron_expression = Column(String(100), nullable=False, comment="Cron-like schedule (e.g., '0 2 * * *')")
enabled = Column(Boolean, nullable=False, default=True, comment="Is schedule active?")
last_run = Column(DateTime, nullable=True, comment="Last execution time")
next_run = Column(DateTime, nullable=True, comment="Next scheduled execution")
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Schedule creation time")
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time")
# Relationships
scans = relationship('Scan', back_populates='schedule')
def __repr__(self):
return f"<Schedule(id={self.id}, name='{self.name}', enabled={self.enabled})>"
class Alert(Base):
"""
Alert history and notifications sent.
Stores all alerts generated by the alert rule engine, including severity
levels and email notification status.
"""
__tablename__ = 'alerts'
id = Column(Integer, primary_key=True, autoincrement=True)
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
alert_type = Column(String(50), nullable=False, comment="new_port, cert_expiry, service_change, ping_failed")
severity = Column(String(20), nullable=False, comment="info, warning, critical")
message = Column(Text, nullable=False, comment="Human-readable alert message")
ip_address = Column(String(45), nullable=True, comment="Related IP (optional)")
port = Column(Integer, nullable=True, comment="Related port (optional)")
email_sent = Column(Boolean, nullable=False, default=False, comment="Was email notification sent?")
email_sent_at = Column(DateTime, nullable=True, comment="Email send timestamp")
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Alert creation time")
# Relationships
scan = relationship('Scan', back_populates='alerts')
# Index for alert queries by type and severity
__table_args__ = (
{'comment': 'Indexes for alert filtering'},
)
def __repr__(self):
return f"<Alert(id={self.id}, alert_type='{self.alert_type}', severity='{self.severity}')>"
class AlertRule(Base):
"""
User-defined alert rules.
Configurable rules that trigger alerts based on scan results (e.g.,
certificates expiring in <30 days, unexpected ports opened).
"""
__tablename__ = 'alert_rules'
id = Column(Integer, primary_key=True, autoincrement=True)
rule_type = Column(String(50), nullable=False, comment="unexpected_port, cert_expiry, service_down, etc.")
enabled = Column(Boolean, nullable=False, default=True, comment="Is rule active?")
threshold = Column(Integer, nullable=True, comment="Threshold value (e.g., days for cert expiry)")
email_enabled = Column(Boolean, nullable=False, default=False, comment="Send email for this rule?")
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Rule creation time")
def __repr__(self):
return f"<AlertRule(id={self.id}, rule_type='{self.rule_type}', enabled={self.enabled})>"
# ============================================================================
# Settings Table
# ============================================================================
class Setting(Base):
"""
Application configuration key-value store.
Stores application settings including SMTP configuration, authentication,
and retention policies. Values stored as JSON for complex data types.
"""
__tablename__ = 'settings'
id = Column(Integer, primary_key=True, autoincrement=True)
key = Column(String(255), nullable=False, unique=True, index=True, comment="Setting key (e.g., smtp_server)")
value = Column(Text, nullable=True, comment="Setting value (JSON for complex values)")
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time")
def __repr__(self):
return f"<Setting(key='{self.key}', value='{self.value[:50] if self.value else None}...')>"

0
web/utils/__init__.py Normal file
View File

323
web/utils/settings.py Normal file
View File

@@ -0,0 +1,323 @@
"""
Settings management system for SneakyScanner.
Provides secure storage and retrieval of application settings with encryption
for sensitive values like passwords and API tokens.
"""
import json
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import bcrypt
from cryptography.fernet import Fernet
from sqlalchemy.orm import Session
from web.models import Setting
class SettingsManager:
"""
Manages application settings with encryption support.
Handles CRUD operations for settings stored in the database, with automatic
encryption/decryption for sensitive values.
"""
# Keys that should be encrypted when stored
ENCRYPTED_KEYS = {
'smtp_password',
'api_token',
'encryption_key',
}
def __init__(self, db_session: Session, encryption_key: Optional[bytes] = None):
"""
Initialize the settings manager.
Args:
db_session: SQLAlchemy database session
encryption_key: Fernet encryption key (32 url-safe base64-encoded bytes)
If not provided, will generate or load from environment
"""
self.db = db_session
self._encryption_key = encryption_key or self._get_or_create_encryption_key()
self._cipher = Fernet(self._encryption_key)
def _get_or_create_encryption_key(self) -> bytes:
"""
Get encryption key from environment or generate new one.
Returns:
Fernet encryption key (32 url-safe base64-encoded bytes)
"""
# Try to get from environment variable
key_str = os.environ.get('SNEAKYSCANNER_ENCRYPTION_KEY')
if key_str:
return key_str.encode()
# Try to get from settings table (for persistence)
existing_key = self.get('encryption_key', decrypt=False)
if existing_key:
return existing_key.encode()
# Generate new key if none exists
new_key = Fernet.generate_key()
# Store it in settings (unencrypted, as it's the key itself)
self._store_raw('encryption_key', new_key.decode())
return new_key
def _store_raw(self, key: str, value: str) -> None:
"""Store a setting without encryption (internal use only)."""
setting = self.db.query(Setting).filter_by(key=key).first()
if setting:
setting.value = value
setting.updated_at = datetime.utcnow()
else:
setting = Setting(key=key, value=value)
self.db.add(setting)
self.db.commit()
def _should_encrypt(self, key: str) -> bool:
"""Check if a setting key should be encrypted."""
return key in self.ENCRYPTED_KEYS
def _encrypt(self, value: str) -> str:
"""Encrypt a string value."""
return self._cipher.encrypt(value.encode()).decode()
def _decrypt(self, encrypted_value: str) -> str:
"""Decrypt an encrypted value."""
return self._cipher.decrypt(encrypted_value.encode()).decode()
def get(self, key: str, default: Any = None, decrypt: bool = True) -> Any:
"""
Get a setting value by key.
Args:
key: Setting key to retrieve
default: Default value if key not found
decrypt: Whether to decrypt if value is encrypted
Returns:
Setting value (automatically decrypts if needed and decrypt=True)
"""
setting = self.db.query(Setting).filter_by(key=key).first()
if not setting:
return default
value = setting.value
if value is None:
return default
# Decrypt if needed
if decrypt and self._should_encrypt(key):
try:
value = self._decrypt(value)
except Exception:
# If decryption fails, return as-is (might be legacy unencrypted value)
pass
# Try to parse JSON for complex types
if value.startswith('[') or value.startswith('{'):
try:
return json.loads(value)
except json.JSONDecodeError:
pass
return value
def set(self, key: str, value: Any, encrypt: bool = None) -> None:
"""
Set a setting value.
Args:
key: Setting key
value: Setting value (will be JSON-encoded if dict/list)
encrypt: Force encryption on/off (None = auto-detect from ENCRYPTED_KEYS)
"""
# Convert complex types to JSON
if isinstance(value, (dict, list)):
value_str = json.dumps(value)
else:
value_str = str(value)
# Determine if we should encrypt
should_encrypt = encrypt if encrypt is not None else self._should_encrypt(key)
if should_encrypt:
value_str = self._encrypt(value_str)
# Store in database
setting = self.db.query(Setting).filter_by(key=key).first()
if setting:
setting.value = value_str
setting.updated_at = datetime.utcnow()
else:
setting = Setting(key=key, value=value_str)
self.db.add(setting)
self.db.commit()
def delete(self, key: str) -> bool:
"""
Delete a setting.
Args:
key: Setting key to delete
Returns:
True if deleted, False if key not found
"""
setting = self.db.query(Setting).filter_by(key=key).first()
if setting:
self.db.delete(setting)
self.db.commit()
return True
return False
def get_all(self, decrypt: bool = False, sanitize: bool = True) -> Dict[str, Any]:
"""
Get all settings as a dictionary.
Args:
decrypt: Whether to decrypt encrypted values
sanitize: If True, replaces encrypted values with '***' for security
Returns:
Dictionary of all settings
"""
settings = self.db.query(Setting).all()
result = {}
for setting in settings:
key = setting.key
value = setting.value
if value is None:
result[key] = None
continue
# Handle sanitization for sensitive keys
if sanitize and self._should_encrypt(key):
result[key] = '***ENCRYPTED***'
continue
# Decrypt if requested
if decrypt and self._should_encrypt(key):
try:
value = self._decrypt(value)
except Exception:
pass
# Try to parse JSON
if value and (value.startswith('[') or value.startswith('{')):
try:
value = json.loads(value)
except json.JSONDecodeError:
pass
result[key] = value
return result
def init_defaults(self) -> None:
"""
Initialize default settings if they don't exist.
This should be called on first app startup to populate default values.
"""
defaults = {
# SMTP settings
'smtp_server': 'localhost',
'smtp_port': 587,
'smtp_username': '',
'smtp_password': '',
'smtp_from_email': 'noreply@sneakyscanner.local',
'smtp_to_emails': [],
# Authentication
'app_password': '', # Will need to be set by user
# Retention policy
'retention_days': 0, # 0 = keep forever
# Alert settings
'cert_expiry_threshold': 30, # Days before expiry to alert
'email_alerts_enabled': False,
}
for key, value in defaults.items():
# Only set if doesn't exist
if self.db.query(Setting).filter_by(key=key).first() is None:
self.set(key, value)
class PasswordManager:
"""
Manages password hashing and verification using bcrypt.
Used for the single-user authentication system.
"""
@staticmethod
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt.
Args:
password: Plain text password
Returns:
Bcrypt hash string
"""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
@staticmethod
def verify_password(password: str, hashed: str) -> bool:
"""
Verify a password against a bcrypt hash.
Args:
password: Plain text password to verify
hashed: Bcrypt hash to check against
Returns:
True if password matches, False otherwise
"""
try:
return bcrypt.checkpw(password.encode(), hashed.encode())
except Exception:
return False
@staticmethod
def set_app_password(settings_manager: SettingsManager, password: str) -> None:
"""
Set the application password (stored as bcrypt hash).
Args:
settings_manager: SettingsManager instance
password: New password to set
"""
hashed = PasswordManager.hash_password(password)
# Password hash stored as regular setting (not encrypted, as it's already a hash)
settings_manager.set('app_password', hashed, encrypt=False)
@staticmethod
def verify_app_password(settings_manager: SettingsManager, password: str) -> bool:
"""
Verify the application password.
Args:
settings_manager: SettingsManager instance
password: Password to verify
Returns:
True if password matches, False otherwise
"""
stored_hash = settings_manager.get('app_password', decrypt=False)
if not stored_hash:
# No password set - should prompt user to create one
return False
return PasswordManager.verify_password(password, stored_hash)