Implemented comprehensive Flask-Login authentication with single-user support. New Features: - Flask-Login integration with User model - Bcrypt password hashing via PasswordManager - Login, logout, and initial password setup routes - @login_required and @api_auth_required decorators - All API endpoints now require authentication - Bootstrap 5 dark theme UI templates - Dashboard with navigation - Remember me and next parameter redirect support Files Created (12): - web/auth/__init__.py, models.py, decorators.py, routes.py - web/routes/__init__.py, main.py - web/templates/login.html, setup.html, dashboard.html, scans.html, scan_detail.html - tests/test_authentication.py (30+ tests) Files Modified (6): - web/app.py: Added Flask-Login initialization and main routes - web/api/scans.py: Protected all endpoints with @api_auth_required - web/api/settings.py: Protected all endpoints with @api_auth_required - web/api/schedules.py: Protected all endpoints with @api_auth_required - web/api/alerts.py: Protected all endpoints with @api_auth_required - tests/conftest.py: Added authentication test fixtures Security: - Session-based authentication for both web UI and API - Secure password storage with bcrypt - Protected routes redirect to login page - Protected API endpoints return 401 Unauthorized - Health check endpoints remain accessible for monitoring Testing: - User model authentication and properties - Login success/failure flows - Logout and session management - Password setup workflow - API endpoint authentication requirements - Session persistence and remember me functionality - Next parameter redirect behavior Total: ~1,200 lines of code added 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
354 lines
9.7 KiB
Python
354 lines
9.7 KiB
Python
"""
|
|
Flask application factory for SneakyScanner web interface.
|
|
|
|
This module creates and configures the Flask application with all necessary
|
|
extensions, blueprints, and middleware.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from flask import Flask, jsonify
|
|
from flask_cors import CORS
|
|
from flask_login import LoginManager
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
|
|
|
from web.models import Base
|
|
|
|
|
|
def create_app(config: dict = None) -> Flask:
|
|
"""
|
|
Create and configure the Flask application.
|
|
|
|
Args:
|
|
config: Optional configuration dictionary to override defaults
|
|
|
|
Returns:
|
|
Configured Flask application instance
|
|
"""
|
|
app = Flask(__name__,
|
|
instance_relative_config=True,
|
|
static_folder='static',
|
|
template_folder='templates')
|
|
|
|
# Load default configuration
|
|
app.config.from_mapping(
|
|
SECRET_KEY=os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production'),
|
|
SQLALCHEMY_DATABASE_URI=os.environ.get('DATABASE_URL', 'sqlite:///./sneakyscanner.db'),
|
|
SQLALCHEMY_TRACK_MODIFICATIONS=False,
|
|
JSON_SORT_KEYS=False, # Preserve order in JSON responses
|
|
MAX_CONTENT_LENGTH=50 * 1024 * 1024, # 50MB max upload size
|
|
)
|
|
|
|
# Override with custom config if provided
|
|
if config:
|
|
app.config.update(config)
|
|
|
|
# Ensure instance folder exists
|
|
try:
|
|
os.makedirs(app.instance_path, exist_ok=True)
|
|
except OSError:
|
|
pass
|
|
|
|
# Configure logging
|
|
configure_logging(app)
|
|
|
|
# Initialize database
|
|
init_database(app)
|
|
|
|
# Initialize extensions
|
|
init_extensions(app)
|
|
|
|
# Initialize authentication
|
|
init_authentication(app)
|
|
|
|
# Initialize background scheduler
|
|
init_scheduler(app)
|
|
|
|
# Register blueprints
|
|
register_blueprints(app)
|
|
|
|
# Register error handlers
|
|
register_error_handlers(app)
|
|
|
|
# Add request/response handlers
|
|
register_request_handlers(app)
|
|
|
|
app.logger.info("SneakyScanner Flask app initialized")
|
|
|
|
return app
|
|
|
|
|
|
def configure_logging(app: Flask) -> None:
|
|
"""
|
|
Configure application logging.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
# Set log level from environment or default to INFO
|
|
log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
|
|
app.logger.setLevel(getattr(logging, log_level, logging.INFO))
|
|
|
|
# Create logs directory if it doesn't exist
|
|
log_dir = Path('logs')
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
# File handler for all logs
|
|
file_handler = logging.FileHandler(log_dir / 'sneakyscanner.log')
|
|
file_handler.setLevel(logging.INFO)
|
|
file_formatter = logging.Formatter(
|
|
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
file_handler.setFormatter(file_formatter)
|
|
app.logger.addHandler(file_handler)
|
|
|
|
# Console handler for development
|
|
if app.debug:
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.DEBUG)
|
|
console_formatter = logging.Formatter(
|
|
'[%(levelname)s] %(name)s: %(message)s'
|
|
)
|
|
console_handler.setFormatter(console_formatter)
|
|
app.logger.addHandler(console_handler)
|
|
|
|
|
|
def init_database(app: Flask) -> None:
|
|
"""
|
|
Initialize database connection and session management.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
# Create engine
|
|
engine = create_engine(
|
|
app.config['SQLALCHEMY_DATABASE_URI'],
|
|
echo=app.debug, # Log SQL in debug mode
|
|
pool_pre_ping=True, # Verify connections before using
|
|
pool_recycle=3600, # Recycle connections after 1 hour
|
|
)
|
|
|
|
# Create scoped session factory
|
|
db_session = scoped_session(
|
|
sessionmaker(
|
|
autocommit=False,
|
|
autoflush=False,
|
|
bind=engine
|
|
)
|
|
)
|
|
|
|
# Store session in app for use in views
|
|
app.db_session = db_session
|
|
|
|
# Create tables if they don't exist (for development)
|
|
# In production, use Alembic migrations instead
|
|
if app.debug:
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
@app.teardown_appcontext
|
|
def shutdown_session(exception=None):
|
|
"""Remove database session at end of request."""
|
|
db_session.remove()
|
|
|
|
app.logger.info(f"Database initialized: {app.config['SQLALCHEMY_DATABASE_URI']}")
|
|
|
|
|
|
def init_extensions(app: Flask) -> None:
|
|
"""
|
|
Initialize Flask extensions.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
# CORS support for API
|
|
CORS(app, resources={
|
|
r"/api/*": {
|
|
"origins": os.environ.get('CORS_ORIGINS', '*').split(','),
|
|
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
"allow_headers": ["Content-Type", "Authorization"],
|
|
}
|
|
})
|
|
|
|
app.logger.info("Extensions initialized")
|
|
|
|
|
|
def init_authentication(app: Flask) -> None:
|
|
"""
|
|
Initialize Flask-Login authentication.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
from web.auth.models import User
|
|
|
|
# Initialize LoginManager
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
# Configure login view
|
|
login_manager.login_view = 'auth.login'
|
|
login_manager.login_message = 'Please log in to access this page.'
|
|
login_manager.login_message_category = 'info'
|
|
|
|
# User loader callback
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
"""Load user by ID for Flask-Login."""
|
|
return User.get(user_id, app.db_session)
|
|
|
|
app.logger.info("Authentication initialized")
|
|
|
|
|
|
def init_scheduler(app: Flask) -> None:
|
|
"""
|
|
Initialize background job scheduler.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
from web.services.scheduler_service import SchedulerService
|
|
|
|
# Create and initialize scheduler
|
|
scheduler = SchedulerService()
|
|
scheduler.init_scheduler(app)
|
|
|
|
# Store in app context for access from routes
|
|
app.scheduler = scheduler
|
|
|
|
app.logger.info("Background scheduler initialized")
|
|
|
|
|
|
def register_blueprints(app: Flask) -> None:
|
|
"""
|
|
Register Flask blueprints for different app sections.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
# Import blueprints
|
|
from web.api.scans import bp as scans_bp
|
|
from web.api.schedules import bp as schedules_bp
|
|
from web.api.alerts import bp as alerts_bp
|
|
from web.api.settings import bp as settings_bp
|
|
from web.auth.routes import bp as auth_bp
|
|
from web.routes.main import bp as main_bp
|
|
|
|
# Register authentication blueprint
|
|
app.register_blueprint(auth_bp, url_prefix='/auth')
|
|
|
|
# Register main web routes blueprint
|
|
app.register_blueprint(main_bp, url_prefix='/')
|
|
|
|
# Register API blueprints
|
|
app.register_blueprint(scans_bp, url_prefix='/api/scans')
|
|
app.register_blueprint(schedules_bp, url_prefix='/api/schedules')
|
|
app.register_blueprint(alerts_bp, url_prefix='/api/alerts')
|
|
app.register_blueprint(settings_bp, url_prefix='/api/settings')
|
|
|
|
app.logger.info("Blueprints registered")
|
|
|
|
|
|
def register_error_handlers(app: Flask) -> None:
|
|
"""
|
|
Register error handlers for common HTTP errors.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
@app.errorhandler(400)
|
|
def bad_request(error):
|
|
return jsonify({
|
|
'error': 'Bad Request',
|
|
'message': str(error) or 'The request was invalid'
|
|
}), 400
|
|
|
|
@app.errorhandler(401)
|
|
def unauthorized(error):
|
|
return jsonify({
|
|
'error': 'Unauthorized',
|
|
'message': 'Authentication required'
|
|
}), 401
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden(error):
|
|
return jsonify({
|
|
'error': 'Forbidden',
|
|
'message': 'You do not have permission to access this resource'
|
|
}), 403
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return jsonify({
|
|
'error': 'Not Found',
|
|
'message': 'The requested resource was not found'
|
|
}), 404
|
|
|
|
@app.errorhandler(405)
|
|
def method_not_allowed(error):
|
|
return jsonify({
|
|
'error': 'Method Not Allowed',
|
|
'message': 'The HTTP method is not allowed for this endpoint'
|
|
}), 405
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(error):
|
|
app.logger.error(f"Internal server error: {error}")
|
|
return jsonify({
|
|
'error': 'Internal Server Error',
|
|
'message': 'An unexpected error occurred'
|
|
}), 500
|
|
|
|
|
|
def register_request_handlers(app: Flask) -> None:
|
|
"""
|
|
Register request and response handlers.
|
|
|
|
Args:
|
|
app: Flask application instance
|
|
"""
|
|
@app.before_request
|
|
def log_request():
|
|
"""Log incoming requests."""
|
|
if app.debug:
|
|
app.logger.debug(f"{request.method} {request.path}")
|
|
|
|
@app.after_request
|
|
def add_security_headers(response):
|
|
"""Add security headers to all responses."""
|
|
# Only add CORS and security headers for API routes
|
|
if request.path.startswith('/api/'):
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
|
|
return response
|
|
|
|
# Import request at runtime to avoid circular imports
|
|
from flask import request
|
|
|
|
# Re-apply to ensure request is available
|
|
@app.before_request
|
|
def log_request():
|
|
"""Log incoming requests."""
|
|
if app.debug:
|
|
app.logger.debug(f"{request.method} {request.path}")
|
|
|
|
|
|
# Development server entry point
|
|
def main():
|
|
"""Run development server."""
|
|
app = create_app()
|
|
app.run(
|
|
host=os.environ.get('FLASK_HOST', '0.0.0.0'),
|
|
port=int(os.environ.get('FLASK_PORT', 5000)),
|
|
debug=os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|