Files
Code_of_Conquest/api/app/utils/auth.py
2025-11-24 23:10:55 -06:00

445 lines
13 KiB
Python

"""
Authentication Utilities
This module provides authentication middleware, decorators, and helper functions
for protecting routes and managing user sessions.
Usage:
from app.utils.auth import require_auth, require_tier, get_current_user
@app.route('/protected')
@require_auth
def protected_route():
user = get_current_user()
return f"Hello, {user.name}!"
@app.route('/premium-feature')
@require_auth
@require_tier('premium')
def premium_feature():
return "Premium content"
"""
from functools import wraps
from typing import Optional, Callable
from flask import request, g, jsonify, redirect, url_for
from app.services.appwrite_service import AppwriteService, UserData
from app.utils.response import unauthorized_response, forbidden_response
from app.utils.logging import get_logger
from app.config import get_config
from appwrite.exception import AppwriteException
# Initialize logger
logger = get_logger(__file__)
def extract_session_token() -> Optional[str]:
"""
Extract the session token from the request cookie.
Returns:
Session token string if found, None otherwise
"""
config = get_config()
cookie_name = config.auth.cookie_name
token = request.cookies.get(cookie_name)
return token
def verify_session(token: str) -> Optional[UserData]:
"""
Verify a session token and return the associated user data.
This function:
1. Validates the session token with Appwrite
2. Checks if the session is still active (not expired)
3. Retrieves and returns the user data
Args:
token: Session token from cookie
Returns:
UserData object if session is valid, None otherwise
"""
try:
appwrite = AppwriteService()
# Validate session
session_data = appwrite.get_session(session_id=token)
# Get user data
user_data = appwrite.get_user(user_id=session_data.user_id)
return user_data
except AppwriteException as e:
logger.warning("Session verification failed", error=str(e), code=e.code)
return None
except Exception as e:
logger.error("Unexpected error during session verification", error=str(e))
return None
def get_current_user() -> Optional[UserData]:
"""
Get the current authenticated user from the request context.
This function retrieves the user object that was attached to the
request context by the @require_auth decorator.
Returns:
UserData object if user is authenticated, None otherwise
Usage:
@app.route('/profile')
@require_auth
def profile():
user = get_current_user()
return f"Welcome, {user.name}!"
"""
return getattr(g, 'current_user', None)
def require_auth(f: Callable) -> Callable:
"""
Decorator to require authentication for a route (API endpoints).
This decorator:
1. Extracts the session token from the cookie
2. Verifies the session with Appwrite
3. Attaches the user object to the request context (g.current_user)
4. Allows the request to proceed if authenticated
5. Returns 401 Unauthorized JSON if not authenticated
For web views, use @require_auth_web instead.
Args:
f: The Flask route function to wrap
Returns:
Wrapped function with authentication check
Usage:
@app.route('/api/protected')
@require_auth
def protected_route():
user = get_current_user()
return f"Hello, {user.name}!"
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract session token from cookie
token = extract_session_token()
if not token:
logger.warning("Authentication required but no session token provided", path=request.path)
return unauthorized_response(message="Authentication required. Please log in.")
# Verify session and get user
user = verify_session(token)
if not user:
logger.warning("Invalid or expired session token", path=request.path)
return unauthorized_response(message="Session invalid or expired. Please log in again.")
# Attach user to request context
g.current_user = user
logger.debug("User authenticated", user_id=user.id, email=user.email, path=request.path)
# Call the original function
return f(*args, **kwargs)
return decorated_function
def require_auth_web(f: Callable) -> Callable:
"""
Decorator to require authentication for a web view route.
This decorator:
1. Extracts the session token from the cookie
2. Verifies the session with Appwrite
3. Attaches the user object to the request context (g.current_user)
4. Allows the request to proceed if authenticated
5. Redirects to login page if not authenticated
For API endpoints, use @require_auth instead.
Args:
f: The Flask route function to wrap
Returns:
Wrapped function with authentication check
Usage:
@app.route('/dashboard')
@require_auth_web
def dashboard():
user = get_current_user()
return render_template('dashboard.html', user=user)
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract session token from cookie
token = extract_session_token()
if not token:
logger.warning("Authentication required but no session token provided", path=request.path)
return redirect(url_for('auth_views.login'))
# Verify session and get user
user = verify_session(token)
if not user:
logger.warning("Invalid or expired session token", path=request.path)
return redirect(url_for('auth_views.login'))
# Attach user to request context
g.current_user = user
logger.debug("User authenticated", user_id=user.id, email=user.email, path=request.path)
# Call the original function
return f(*args, **kwargs)
return decorated_function
def require_tier(minimum_tier: str) -> Callable:
"""
Decorator to require a minimum subscription tier for a route.
This decorator must be used AFTER @require_auth.
Tier hierarchy (from lowest to highest):
- free
- basic
- premium
- elite
Args:
minimum_tier: Minimum required tier (free, basic, premium, elite)
Returns:
Decorator function
Raises:
ValueError: If minimum_tier is invalid
Usage:
@app.route('/premium-feature')
@require_auth
@require_tier('premium')
def premium_feature():
return "Premium content"
"""
# Define tier hierarchy
tier_hierarchy = {
'free': 0,
'basic': 1,
'premium': 2,
'elite': 3
}
if minimum_tier not in tier_hierarchy:
raise ValueError(f"Invalid tier: {minimum_tier}. Must be one of {list(tier_hierarchy.keys())}")
def decorator(f: Callable) -> Callable:
@wraps(f)
def decorated_function(*args, **kwargs):
# Get current user (set by @require_auth)
user = get_current_user()
if not user:
logger.error("require_tier used without require_auth", path=request.path)
return unauthorized_response(message="Authentication required.")
# Get user's tier level
user_tier = user.tier
user_tier_level = tier_hierarchy.get(user_tier, 0)
required_tier_level = tier_hierarchy[minimum_tier]
# Check if user has sufficient tier
if user_tier_level < required_tier_level:
logger.warning(
"Access denied - insufficient tier",
user_id=user.id,
user_tier=user_tier,
required_tier=minimum_tier,
path=request.path
)
return forbidden_response(
message=f"This feature requires {minimum_tier.capitalize()} tier or higher. "
f"Your current tier: {user_tier.capitalize()}."
)
logger.debug(
"Tier requirement met",
user_id=user.id,
user_tier=user_tier,
required_tier=minimum_tier,
path=request.path
)
# Call the original function
return f(*args, **kwargs)
return decorated_function
return decorator
def require_email_verified(f: Callable) -> Callable:
"""
Decorator to require email verification for a route.
This decorator must be used AFTER @require_auth.
Args:
f: The Flask route function to wrap
Returns:
Wrapped function with email verification check
Usage:
@app.route('/verified-only')
@require_auth
@require_email_verified
def verified_only():
return "You can only see this if your email is verified"
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get current user (set by @require_auth)
user = get_current_user()
if not user:
logger.error("require_email_verified used without require_auth", path=request.path)
return unauthorized_response(message="Authentication required.")
# Check if email is verified
if not user.email_verified:
logger.warning(
"Access denied - email not verified",
user_id=user.id,
email=user.email,
path=request.path
)
return forbidden_response(
message="Email verification required. Please check your inbox and verify your email address."
)
logger.debug("Email verification confirmed", user_id=user.id, email=user.email, path=request.path)
# Call the original function
return f(*args, **kwargs)
return decorated_function
def optional_auth(f: Callable) -> Callable:
"""
Decorator for routes that optionally use authentication.
This decorator will attach the user to g.current_user if authenticated,
but will NOT block the request if not authenticated. Use this for routes
that should behave differently based on authentication status.
Args:
f: The Flask route function to wrap
Returns:
Wrapped function with optional authentication
Usage:
@app.route('/landing')
@optional_auth
def landing():
user = get_current_user()
if user:
return f"Welcome back, {user.name}!"
else:
return "Welcome! Please log in."
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract session token from cookie
token = extract_session_token()
if token:
# Verify session and get user
user = verify_session(token)
if user:
# Attach user to request context
g.current_user = user
logger.debug("Optional auth - user authenticated", user_id=user.id, path=request.path)
else:
logger.debug("Optional auth - invalid session", path=request.path)
else:
logger.debug("Optional auth - no session token", path=request.path)
# Call the original function regardless of authentication
return f(*args, **kwargs)
return decorated_function
def get_user_tier() -> str:
"""
Get the current user's tier.
Returns:
Tier string (free, basic, premium, elite), defaults to 'free' if not authenticated
Usage:
@app.route('/dashboard')
@require_auth
def dashboard():
tier = get_user_tier()
return f"Your tier: {tier}"
"""
user = get_current_user()
if user:
return user.tier
return 'free'
def is_tier_sufficient(required_tier: str) -> bool:
"""
Check if the current user's tier meets the requirement.
Args:
required_tier: Required tier level
Returns:
True if user's tier is sufficient, False otherwise
Usage:
@app.route('/feature')
@require_auth
def feature():
if is_tier_sufficient('premium'):
return "Premium features enabled"
else:
return "Upgrade to premium for more features"
"""
tier_hierarchy = {
'free': 0,
'basic': 1,
'premium': 2,
'elite': 3
}
user = get_current_user()
if not user:
return False
user_tier_level = tier_hierarchy.get(user.tier, 0)
required_tier_level = tier_hierarchy.get(required_tier, 0)
return user_tier_level >= required_tier_level