""" Authentication Utilities This module provides authentication middleware, decorators, and helper functions for protecting routes and managing user sessions. Usage: from app.utils.auth import require_auth, require_tier, get_current_user @app.route('/protected') @require_auth def protected_route(): user = get_current_user() return f"Hello, {user.name}!" @app.route('/premium-feature') @require_auth @require_tier('premium') def premium_feature(): return "Premium content" """ from functools import wraps from typing import Optional, Callable from flask import request, g, jsonify, redirect, url_for from app.services.appwrite_service import AppwriteService, UserData from app.services.session_cache_service import SessionCacheService from app.utils.response import unauthorized_response, forbidden_response from app.utils.logging import get_logger from app.config import get_config from appwrite.exception import AppwriteException # Initialize logger logger = get_logger(__file__) def extract_session_token() -> Optional[str]: """ Extract the session token from the request cookie. Returns: Session token string if found, None otherwise """ config = get_config() cookie_name = config.auth.cookie_name token = request.cookies.get(cookie_name) return token def verify_session(token: str) -> Optional[UserData]: """ Verify a session token and return the associated user data. This function: 1. Checks the Redis session cache for a valid cached session 2. On cache miss, validates the session token with Appwrite 3. Caches the validated session for future requests 4. Returns the user data if valid The session cache reduces Appwrite API calls by ~90% by caching validated sessions for a configurable TTL (default: 5 minutes). Args: token: Session token from cookie Returns: UserData object if session is valid, None otherwise """ # Try cache first (reduces Appwrite calls by ~90%) cache = SessionCacheService() cached_user = cache.get(token) if cached_user is not None: return cached_user # Cache miss - validate with Appwrite try: appwrite = AppwriteService() # Validate session session_data = appwrite.get_session(session_id=token) # Get user data user_data = appwrite.get_user(user_id=session_data.user_id) # Cache the validated session cache.set(token, user_data, session_data.expire) return user_data except AppwriteException as e: logger.warning("Session verification failed", error=str(e), code=e.code) return None except Exception as e: logger.error("Unexpected error during session verification", error=str(e)) return None def get_current_user() -> Optional[UserData]: """ Get the current authenticated user from the request context. This function retrieves the user object that was attached to the request context by the @require_auth decorator. Returns: UserData object if user is authenticated, None otherwise Usage: @app.route('/profile') @require_auth def profile(): user = get_current_user() return f"Welcome, {user.name}!" """ return getattr(g, 'current_user', None) def require_auth(f: Callable) -> Callable: """ Decorator to require authentication for a route (API endpoints). This decorator: 1. Extracts the session token from the cookie 2. Verifies the session with Appwrite 3. Attaches the user object to the request context (g.current_user) 4. Allows the request to proceed if authenticated 5. Returns 401 Unauthorized JSON if not authenticated For web views, use @require_auth_web instead. Args: f: The Flask route function to wrap Returns: Wrapped function with authentication check Usage: @app.route('/api/protected') @require_auth def protected_route(): user = get_current_user() return f"Hello, {user.name}!" """ @wraps(f) def decorated_function(*args, **kwargs): # Extract session token from cookie token = extract_session_token() if not token: logger.warning("Authentication required but no session token provided", path=request.path) return unauthorized_response(message="Authentication required. Please log in.") # Verify session and get user user = verify_session(token) if not user: logger.warning("Invalid or expired session token", path=request.path) return unauthorized_response(message="Session invalid or expired. Please log in again.") # Attach user to request context g.current_user = user logger.debug("User authenticated", user_id=user.id, email=user.email, path=request.path) # Call the original function return f(*args, **kwargs) return decorated_function def require_auth_web(f: Callable) -> Callable: """ Decorator to require authentication for a web view route. This decorator: 1. Extracts the session token from the cookie 2. Verifies the session with Appwrite 3. Attaches the user object to the request context (g.current_user) 4. Allows the request to proceed if authenticated 5. Redirects to login page if not authenticated For API endpoints, use @require_auth instead. Args: f: The Flask route function to wrap Returns: Wrapped function with authentication check Usage: @app.route('/dashboard') @require_auth_web def dashboard(): user = get_current_user() return render_template('dashboard.html', user=user) """ @wraps(f) def decorated_function(*args, **kwargs): # Extract session token from cookie token = extract_session_token() if not token: logger.warning("Authentication required but no session token provided", path=request.path) return redirect(url_for('auth_views.login')) # Verify session and get user user = verify_session(token) if not user: logger.warning("Invalid or expired session token", path=request.path) return redirect(url_for('auth_views.login')) # Attach user to request context g.current_user = user logger.debug("User authenticated", user_id=user.id, email=user.email, path=request.path) # Call the original function return f(*args, **kwargs) return decorated_function def require_tier(minimum_tier: str) -> Callable: """ Decorator to require a minimum subscription tier for a route. This decorator must be used AFTER @require_auth. Tier hierarchy (from lowest to highest): - free - basic - premium - elite Args: minimum_tier: Minimum required tier (free, basic, premium, elite) Returns: Decorator function Raises: ValueError: If minimum_tier is invalid Usage: @app.route('/premium-feature') @require_auth @require_tier('premium') def premium_feature(): return "Premium content" """ # Define tier hierarchy tier_hierarchy = { 'free': 0, 'basic': 1, 'premium': 2, 'elite': 3 } if minimum_tier not in tier_hierarchy: raise ValueError(f"Invalid tier: {minimum_tier}. Must be one of {list(tier_hierarchy.keys())}") def decorator(f: Callable) -> Callable: @wraps(f) def decorated_function(*args, **kwargs): # Get current user (set by @require_auth) user = get_current_user() if not user: logger.error("require_tier used without require_auth", path=request.path) return unauthorized_response(message="Authentication required.") # Get user's tier level user_tier = user.tier user_tier_level = tier_hierarchy.get(user_tier, 0) required_tier_level = tier_hierarchy[minimum_tier] # Check if user has sufficient tier if user_tier_level < required_tier_level: logger.warning( "Access denied - insufficient tier", user_id=user.id, user_tier=user_tier, required_tier=minimum_tier, path=request.path ) return forbidden_response( message=f"This feature requires {minimum_tier.capitalize()} tier or higher. " f"Your current tier: {user_tier.capitalize()}." ) logger.debug( "Tier requirement met", user_id=user.id, user_tier=user_tier, required_tier=minimum_tier, path=request.path ) # Call the original function return f(*args, **kwargs) return decorated_function return decorator def require_email_verified(f: Callable) -> Callable: """ Decorator to require email verification for a route. This decorator must be used AFTER @require_auth. Args: f: The Flask route function to wrap Returns: Wrapped function with email verification check Usage: @app.route('/verified-only') @require_auth @require_email_verified def verified_only(): return "You can only see this if your email is verified" """ @wraps(f) def decorated_function(*args, **kwargs): # Get current user (set by @require_auth) user = get_current_user() if not user: logger.error("require_email_verified used without require_auth", path=request.path) return unauthorized_response(message="Authentication required.") # Check if email is verified if not user.email_verified: logger.warning( "Access denied - email not verified", user_id=user.id, email=user.email, path=request.path ) return forbidden_response( message="Email verification required. Please check your inbox and verify your email address." ) logger.debug("Email verification confirmed", user_id=user.id, email=user.email, path=request.path) # Call the original function return f(*args, **kwargs) return decorated_function def optional_auth(f: Callable) -> Callable: """ Decorator for routes that optionally use authentication. This decorator will attach the user to g.current_user if authenticated, but will NOT block the request if not authenticated. Use this for routes that should behave differently based on authentication status. Args: f: The Flask route function to wrap Returns: Wrapped function with optional authentication Usage: @app.route('/landing') @optional_auth def landing(): user = get_current_user() if user: return f"Welcome back, {user.name}!" else: return "Welcome! Please log in." """ @wraps(f) def decorated_function(*args, **kwargs): # Extract session token from cookie token = extract_session_token() if token: # Verify session and get user user = verify_session(token) if user: # Attach user to request context g.current_user = user logger.debug("Optional auth - user authenticated", user_id=user.id, path=request.path) else: logger.debug("Optional auth - invalid session", path=request.path) else: logger.debug("Optional auth - no session token", path=request.path) # Call the original function regardless of authentication return f(*args, **kwargs) return decorated_function def get_user_tier() -> str: """ Get the current user's tier. Returns: Tier string (free, basic, premium, elite), defaults to 'free' if not authenticated Usage: @app.route('/dashboard') @require_auth def dashboard(): tier = get_user_tier() return f"Your tier: {tier}" """ user = get_current_user() if user: return user.tier return 'free' def is_tier_sufficient(required_tier: str) -> bool: """ Check if the current user's tier meets the requirement. Args: required_tier: Required tier level Returns: True if user's tier is sufficient, False otherwise Usage: @app.route('/feature') @require_auth def feature(): if is_tier_sufficient('premium'): return "Premium features enabled" else: return "Upgrade to premium for more features" """ tier_hierarchy = { 'free': 0, 'basic': 1, 'premium': 2, 'elite': 3 } user = get_current_user() if not user: return False user_tier_level = tier_hierarchy.get(user.tier, 0) required_tier_level = tier_hierarchy.get(required_tier, 0) return user_tier_level >= required_tier_level