""" Flask application factory for SneakyScanner web interface. This module creates and configures the Flask application with all necessary extensions, blueprints, and middleware. """ import logging import os import uuid from logging.handlers import RotatingFileHandler from pathlib import Path from flask import Flask, g, jsonify, request from flask_cors import CORS from flask_login import LoginManager, current_user from sqlalchemy import create_engine, event from sqlalchemy.orm import scoped_session, sessionmaker from web.models import Base class RequestIDLogFilter(logging.Filter): """ Logging filter that injects request ID into log records. Adds a 'request_id' attribute to each log record. For requests within Flask request context, uses the request ID from g.request_id. For logs outside request context (background jobs, startup), uses 'system'. """ def filter(self, record): """Add request_id to log record.""" try: # Try to get request ID from Flask's g object record.request_id = g.get('request_id', 'system') except (RuntimeError, AttributeError): # Outside of request context record.request_id = 'system' return True def create_app(config: dict = None) -> Flask: """ Create and configure the Flask application. Args: config: Optional configuration dictionary to override defaults Returns: Configured Flask application instance """ app = Flask(__name__, instance_relative_config=True, static_folder='static', template_folder='templates') # Load default configuration app.config.from_mapping( SECRET_KEY=os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production'), SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URL', 'sqlite:///./sneakyscanner.db'), SQLALCHEMY_TRACK_MODIFICATIONS=False, JSON_SORT_KEYS=False, # Preserve order in JSON responses MAX_CONTENT_LENGTH=50 * 1024 * 1024, # 50MB max upload size ) # Override with custom config if provided if config: app.config.update(config) # Ensure instance folder exists try: os.makedirs(app.instance_path, exist_ok=True) except OSError: pass # Configure logging configure_logging(app) # Initialize database init_database(app) # Initialize extensions init_extensions(app) # Initialize authentication init_authentication(app) # Initialize background scheduler init_scheduler(app) # Register blueprints register_blueprints(app) # Register error handlers register_error_handlers(app) # Add request/response handlers register_request_handlers(app) app.logger.info("SneakyScanner Flask app initialized") return app def configure_logging(app: Flask) -> None: """ Configure application logging with rotation and structured format. Args: app: Flask application instance """ # Set log level from environment or default to INFO log_level = os.environ.get('LOG_LEVEL', 'INFO').upper() app.logger.setLevel(getattr(logging, log_level, logging.INFO)) # Create logs directory if it doesn't exist log_dir = Path('logs') log_dir.mkdir(exist_ok=True) # Rotating file handler for application logs # Max 10MB per file, keep 10 backup files (100MB total) app_log_handler = RotatingFileHandler( log_dir / 'sneakyscanner.log', maxBytes=10 * 1024 * 1024, # 10MB backupCount=10 ) app_log_handler.setLevel(logging.INFO) # Structured log format with more context log_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s [%(request_id)s] ' '%(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) app_log_handler.setFormatter(log_formatter) # Add filter to inject request ID into log records app_log_handler.addFilter(RequestIDLogFilter()) app.logger.addHandler(app_log_handler) # Separate rotating file handler for errors only error_log_handler = RotatingFileHandler( log_dir / 'sneakyscanner_errors.log', maxBytes=10 * 1024 * 1024, # 10MB backupCount=5 ) error_log_handler.setLevel(logging.ERROR) error_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s [%(request_id)s]\n' 'Message: %(message)s\n' 'Path: %(pathname)s:%(lineno)d\n' '%(stack_info)s', datefmt='%Y-%m-%d %H:%M:%S' ) error_log_handler.setFormatter(error_formatter) error_log_handler.addFilter(RequestIDLogFilter()) app.logger.addHandler(error_log_handler) # Console handler for development if app.debug: console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s [%(request_id)s] %(message)s', datefmt='%H:%M:%S' ) console_handler.setFormatter(console_formatter) console_handler.addFilter(RequestIDLogFilter()) app.logger.addHandler(console_handler) app.logger.info("Logging configured with rotation (10MB per file, 10 backups)") def init_database(app: Flask) -> None: """ Initialize database connection and session management. Args: app: Flask application instance """ # Determine connect_args based on database type connect_args = {} if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']: # SQLite-specific configuration for better concurrency connect_args = { 'timeout': 15, # 15 second timeout for database locks 'check_same_thread': False # Allow SQLite usage across threads } # Create engine engine = create_engine( app.config['SQLALCHEMY_DATABASE_URI'], echo=app.debug, # Log SQL in debug mode pool_pre_ping=True, # Verify connections before using pool_recycle=3600, # Recycle connections after 1 hour connect_args=connect_args ) # Enable WAL mode for SQLite (better concurrency) if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']: @event.listens_for(engine, "connect") def set_sqlite_pragma(dbapi_conn, connection_record): """Set SQLite pragmas for better performance and concurrency.""" cursor = dbapi_conn.cursor() cursor.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging cursor.execute("PRAGMA synchronous=NORMAL") # Faster writes cursor.execute("PRAGMA busy_timeout=15000") # 15 second busy timeout cursor.close() # Create scoped session factory db_session = scoped_session( sessionmaker( autocommit=False, autoflush=False, bind=engine ) ) # Store session in app for use in views app.db_session = db_session # Create tables if they don't exist (for development) # In production, use Alembic migrations instead if app.debug: Base.metadata.create_all(bind=engine) @app.teardown_appcontext def shutdown_session(exception=None): """ Remove database session at end of request. Rollback on exception to prevent partial commits. """ if exception: app.logger.warning(f"Request ended with exception, rolling back database session") db_session.rollback() db_session.remove() app.logger.info(f"Database initialized: {app.config['SQLALCHEMY_DATABASE_URI']}") def init_extensions(app: Flask) -> None: """ Initialize Flask extensions. Args: app: Flask application instance """ # CORS support for API CORS(app, resources={ r"/api/*": { "origins": os.environ.get('CORS_ORIGINS', '*').split(','), "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"], } }) app.logger.info("Extensions initialized") def init_authentication(app: Flask) -> None: """ Initialize Flask-Login authentication. Args: app: Flask application instance """ from web.auth.models import User # Initialize LoginManager login_manager = LoginManager() login_manager.init_app(app) # Configure login view login_manager.login_view = 'auth.login' login_manager.login_message = 'Please log in to access this page.' login_manager.login_message_category = 'info' # User loader callback @login_manager.user_loader def load_user(user_id): """Load user by ID for Flask-Login.""" return User.get(user_id, app.db_session) app.logger.info("Authentication initialized") def init_scheduler(app: Flask) -> None: """ Initialize background job scheduler. Args: app: Flask application instance """ from web.services.scheduler_service import SchedulerService # Create and initialize scheduler scheduler = SchedulerService() scheduler.init_scheduler(app) # Store in app context for access from routes app.scheduler = scheduler app.logger.info("Background scheduler initialized") def register_blueprints(app: Flask) -> None: """ Register Flask blueprints for different app sections. Args: app: Flask application instance """ # Import blueprints from web.api.scans import bp as scans_bp from web.api.schedules import bp as schedules_bp from web.api.alerts import bp as alerts_bp from web.api.settings import bp as settings_bp from web.auth.routes import bp as auth_bp from web.routes.main import bp as main_bp # Register authentication blueprint app.register_blueprint(auth_bp, url_prefix='/auth') # Register main web routes blueprint app.register_blueprint(main_bp, url_prefix='/') # Register API blueprints app.register_blueprint(scans_bp, url_prefix='/api/scans') app.register_blueprint(schedules_bp, url_prefix='/api/schedules') app.register_blueprint(alerts_bp, url_prefix='/api/alerts') app.register_blueprint(settings_bp, url_prefix='/api/settings') app.logger.info("Blueprints registered") def register_error_handlers(app: Flask) -> None: """ Register error handlers for common HTTP errors. Handles errors with either JSON responses (for API requests) or HTML templates (for web requests). Ensures database rollback on errors. Args: app: Flask application instance """ from flask import render_template from sqlalchemy.exc import SQLAlchemyError def wants_json(): """Check if client wants JSON response.""" # API requests always get JSON if request.path.startswith('/api/'): return True # Check Accept header best = request.accept_mimetypes.best_match(['application/json', 'text/html']) return best == 'application/json' and \ request.accept_mimetypes[best] > request.accept_mimetypes['text/html'] @app.errorhandler(400) def bad_request(error): """Handle 400 Bad Request errors.""" app.logger.warning(f"Bad request: {request.path} - {str(error)}") if wants_json(): return jsonify({ 'error': 'Bad Request', 'message': str(error) or 'The request was invalid' }), 400 return render_template('errors/400.html', error=error), 400 @app.errorhandler(401) def unauthorized(error): """Handle 401 Unauthorized errors.""" app.logger.warning(f"Unauthorized access attempt: {request.path}") if wants_json(): return jsonify({ 'error': 'Unauthorized', 'message': 'Authentication required' }), 401 return render_template('errors/401.html', error=error), 401 @app.errorhandler(403) def forbidden(error): """Handle 403 Forbidden errors.""" app.logger.warning(f"Forbidden access: {request.path}") if wants_json(): return jsonify({ 'error': 'Forbidden', 'message': 'You do not have permission to access this resource' }), 403 return render_template('errors/403.html', error=error), 403 @app.errorhandler(404) def not_found(error): """Handle 404 Not Found errors.""" app.logger.info(f"Resource not found: {request.path}") if wants_json(): return jsonify({ 'error': 'Not Found', 'message': 'The requested resource was not found' }), 404 return render_template('errors/404.html', error=error), 404 @app.errorhandler(405) def method_not_allowed(error): """Handle 405 Method Not Allowed errors.""" app.logger.warning(f"Method not allowed: {request.method} {request.path}") if wants_json(): return jsonify({ 'error': 'Method Not Allowed', 'message': 'The HTTP method is not allowed for this endpoint' }), 405 return render_template('errors/405.html', error=error), 405 @app.errorhandler(500) def internal_server_error(error): """ Handle 500 Internal Server Error. Rolls back database session and logs full traceback. """ # Rollback database session on error try: app.db_session.rollback() except Exception as e: app.logger.error(f"Failed to rollback database session: {str(e)}") # Log error with full context app.logger.error( f"Internal server error: {request.method} {request.path} - {str(error)}", exc_info=True ) if wants_json(): return jsonify({ 'error': 'Internal Server Error', 'message': 'An unexpected error occurred' }), 500 return render_template('errors/500.html', error=error), 500 @app.errorhandler(SQLAlchemyError) def handle_db_error(error): """ Handle database errors. Rolls back transaction and returns appropriate error response. """ # Rollback database session try: app.db_session.rollback() except Exception as e: app.logger.error(f"Failed to rollback database session: {str(e)}") # Log database error app.logger.error( f"Database error: {request.method} {request.path} - {str(error)}", exc_info=True ) if wants_json(): return jsonify({ 'error': 'Database Error', 'message': 'A database error occurred' }), 500 return render_template('errors/500.html', error=error), 500 def register_request_handlers(app: Flask) -> None: """ Register request and response handlers. Adds request ID generation, request/response logging with timing, and security headers. Args: app: Flask application instance """ import time @app.before_request def before_request_handler(): """ Generate request ID and start timing. Sets g.request_id and g.request_start_time for use in logging and timing calculations. """ # Generate unique request ID g.request_id = str(uuid.uuid4())[:8] # Short ID for readability g.request_start_time = time.time() # Log incoming request with context user_info = 'anonymous' if current_user.is_authenticated: user_info = f'user:{current_user.get_id()}' # Log at INFO level for API calls, DEBUG for other requests if request.path.startswith('/api/'): app.logger.info( f"→ {request.method} {request.path} " f"from={request.remote_addr} user={user_info}" ) elif app.debug: app.logger.debug( f"→ {request.method} {request.path} " f"from={request.remote_addr}" ) @app.after_request def after_request_handler(response): """ Log response and add security headers. Calculates request duration and logs response status. """ # Calculate request duration if hasattr(g, 'request_start_time'): duration_ms = (time.time() - g.request_start_time) * 1000 # Log response with duration if request.path.startswith('/api/'): # Log API responses at INFO level app.logger.info( f"← {request.method} {request.path} " f"status={response.status_code} " f"duration={duration_ms:.2f}ms" ) elif app.debug: # Log web responses at DEBUG level in debug mode app.logger.debug( f"← {request.method} {request.path} " f"status={response.status_code} " f"duration={duration_ms:.2f}ms" ) # Add duration header for API responses if request.path.startswith('/api/'): response.headers['X-Request-Duration-Ms'] = f"{duration_ms:.2f}" response.headers['X-Request-ID'] = g.request_id # Add security headers to all responses if request.path.startswith('/api/'): response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' return response @app.teardown_request def teardown_request_handler(exception=None): """ Log errors that occur during request processing. Args: exception: Exception that occurred, if any """ if exception: app.logger.error( f"Request failed: {request.method} {request.path} " f"error={type(exception).__name__}: {str(exception)}", exc_info=True ) # Development server entry point def main(): """Run development server.""" app = create_app() app.run( host=os.environ.get('FLASK_HOST', '0.0.0.0'), port=int(os.environ.get('FLASK_PORT', 5000)), debug=os.environ.get('FLASK_DEBUG', 'True').lower() == 'true' ) if __name__ == '__main__': main()