Files
Code_of_Conquest/api/app/api/auth.py
Phillip Tarrant 8675f9bf75 feat(api): add Redis session cache to reduce Appwrite API calls by ~90%
- Add SessionCacheService with 5-minute TTL Redis cache
- Cache validated sessions to avoid redundant Appwrite calls
- Add /api/v1/auth/me endpoint for retrieving current user
- Invalidate cache on logout and password reset
- Add session_cache config to auth section (Redis db 2)
- Fix Docker Redis hostname (localhost -> redis)
- Handle timezone-aware datetime comparisons

Security: tokens hashed before use as cache keys, explicit
invalidation on logout/password change, graceful degradation
when Redis unavailable.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 22:01:14 -06:00

569 lines
17 KiB
Python

"""
Authentication API Blueprint
This module provides API endpoints for user authentication and management:
- User registration
- User login/logout
- Email verification
- Password reset
All endpoints follow the standard API response format defined in app.utils.response.
"""
import re
from flask import Blueprint, request, make_response, render_template, redirect, url_for, flash
from appwrite.exception import AppwriteException
from app.services.appwrite_service import AppwriteService
from app.services.session_cache_service import SessionCacheService
from app.utils.response import (
success_response,
created_response,
error_response,
unauthorized_response,
validation_error_response
)
from app.utils.auth import require_auth, get_current_user, extract_session_token
from app.utils.logging import get_logger
from app.config import get_config
# Initialize logger
logger = get_logger(__file__)
# Create blueprint
auth_bp = Blueprint('auth', __name__)
# ===== VALIDATION FUNCTIONS =====
def validate_email(email: str) -> tuple[bool, str]:
"""
Validate email address format.
Args:
email: Email address to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not email:
return False, "Email is required"
config = get_config()
max_length = config.auth.email_max_length
if len(email) > max_length:
return False, f"Email must be no more than {max_length} characters"
# Email regex pattern
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
return False, "Invalid email format"
return True, ""
def validate_password(password: str) -> tuple[bool, str]:
"""
Validate password strength.
Args:
password: Password to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not password:
return False, "Password is required"
config = get_config()
min_length = config.auth.password_min_length
if len(password) < min_length:
return False, f"Password must be at least {min_length} characters long"
if len(password) > 128:
return False, "Password must be no more than 128 characters"
errors = []
if config.auth.password_require_uppercase and not re.search(r'[A-Z]', password):
errors.append("at least one uppercase letter")
if config.auth.password_require_lowercase and not re.search(r'[a-z]', password):
errors.append("at least one lowercase letter")
if config.auth.password_require_number and not re.search(r'[0-9]', password):
errors.append("at least one number")
if config.auth.password_require_special and not re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password):
errors.append("at least one special character")
if errors:
return False, f"Password must contain {', '.join(errors)}"
return True, ""
def validate_name(name: str) -> tuple[bool, str]:
"""
Validate user name.
Args:
name: Name to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not name:
return False, "Name is required"
config = get_config()
min_length = config.auth.name_min_length
max_length = config.auth.name_max_length
if len(name) < min_length:
return False, f"Name must be at least {min_length} characters"
if len(name) > max_length:
return False, f"Name must be no more than {max_length} characters"
# Allow letters, spaces, hyphens, apostrophes
if not re.match(r"^[a-zA-Z\s\-']+$", name):
return False, "Name can only contain letters, spaces, hyphens, and apostrophes"
return True, ""
# ===== API ENDPOINTS =====
@auth_bp.route('/api/v1/auth/register', methods=['POST'])
def api_register():
"""
Register a new user account.
Request Body:
{
"email": "user@example.com",
"password": "SecurePass123!",
"name": "Player Name"
}
Returns:
201: User created successfully
400: Validation error or email already exists
500: Internal server error
"""
try:
# Get request data
data = request.get_json()
if not data:
return validation_error_response({"error": "Request body is required"})
email = data.get('email', '').strip().lower()
password = data.get('password', '')
name = data.get('name', '').strip()
# Validate inputs
validation_errors = {}
email_valid, email_error = validate_email(email)
if not email_valid:
validation_errors['email'] = email_error
password_valid, password_error = validate_password(password)
if not password_valid:
validation_errors['password'] = password_error
name_valid, name_error = validate_name(name)
if not name_valid:
validation_errors['name'] = name_error
if validation_errors:
return validation_error_response(validation_errors)
# Register user
appwrite = AppwriteService()
user_data = appwrite.register_user(email=email, password=password, name=name)
logger.info("User registered successfully", user_id=user_data.id, email=email)
return created_response(
result={
"user": user_data.to_dict(),
"message": "Registration successful. Please check your email to verify your account."
}
)
except AppwriteException as e:
logger.error("Registration failed", error=str(e), code=e.code)
# Check for specific error codes
if e.code == 409: # Conflict - user already exists
return validation_error_response({"email": "An account with this email already exists"})
return error_response(message="Registration failed. Please try again.", code="REGISTRATION_ERROR")
except Exception as e:
logger.error("Unexpected error during registration", error=str(e))
return error_response(message="An unexpected error occurred", code="INTERNAL_ERROR")
@auth_bp.route('/api/v1/auth/login', methods=['POST'])
def api_login():
"""
Authenticate a user and create a session.
Request Body:
{
"email": "user@example.com",
"password": "SecurePass123!",
"remember_me": false
}
Returns:
200: Login successful, session cookie set
401: Invalid credentials
400: Validation error
500: Internal server error
"""
try:
# Get request data
data = request.get_json()
if not data:
return validation_error_response({"error": "Request body is required"})
email = data.get('email', '').strip().lower()
password = data.get('password', '')
remember_me = data.get('remember_me', False)
# Validate inputs
if not email:
return validation_error_response({"email": "Email is required"})
if not password:
return validation_error_response({"password": "Password is required"})
# Authenticate user
appwrite = AppwriteService()
session_data, user_data = appwrite.login_user(email=email, password=password)
logger.info("User logged in successfully", user_id=user_data.id, email=email)
# Set session cookie
config = get_config()
duration = config.auth.duration_remember_me if remember_me else config.auth.duration_normal
response = make_response(success_response(
result={
"user": user_data.to_dict(),
"message": "Login successful"
}
))
response.set_cookie(
key=config.auth.cookie_name,
value=session_data.session_id,
max_age=duration,
httponly=config.auth.http_only,
secure=config.auth.secure,
samesite=config.auth.same_site,
path=config.auth.path
)
return response
except AppwriteException as e:
logger.warning("Login failed", email=email if 'email' in locals() else 'unknown', error=str(e), code=e.code)
# Generic error message for security (don't reveal if email exists)
return unauthorized_response(message="Invalid email or password")
except Exception as e:
logger.error("Unexpected error during login", error=str(e))
return error_response(message="An unexpected error occurred", code="INTERNAL_ERROR")
@auth_bp.route('/api/v1/auth/logout', methods=['POST'])
@require_auth
def api_logout():
"""
Log out the current user by deleting their session.
Returns:
200: Logout successful, session cookie cleared
401: Not authenticated
500: Internal server error
"""
try:
# Get session token
token = extract_session_token()
if not token:
return unauthorized_response(message="No active session")
# Invalidate session cache before Appwrite logout
cache = SessionCacheService()
cache.invalidate_token(token)
# Logout user from Appwrite
appwrite = AppwriteService()
appwrite.logout_user(session_id=token)
user = get_current_user()
logger.info("User logged out successfully", user_id=user.id if user else 'unknown')
# Clear session cookie
config = get_config()
response = make_response(success_response(
result={"message": "Logout successful"}
))
response.set_cookie(
key=config.auth.cookie_name,
value='',
max_age=0,
httponly=config.auth.http_only,
secure=config.auth.secure,
samesite=config.auth.same_site,
path=config.auth.path
)
return response
except AppwriteException as e:
logger.error("Logout failed", error=str(e), code=e.code)
return error_response(message="Logout failed", code="LOGOUT_ERROR")
except Exception as e:
logger.error("Unexpected error during logout", error=str(e))
return error_response(message="An unexpected error occurred", code="INTERNAL_ERROR")
@auth_bp.route('/api/v1/auth/me', methods=['GET'])
@require_auth
def api_get_current_user():
"""
Get the currently authenticated user's data.
This endpoint is lightweight and uses cached session data when available,
making it suitable for frequent use (e.g., checking user tier, verifying
session is still valid).
Returns:
200: User data
401: Not authenticated
"""
user = get_current_user()
if not user:
return unauthorized_response(message="Not authenticated")
return success_response(
result={
"id": user.id,
"email": user.email,
"name": user.name,
"email_verified": user.email_verified,
"tier": user.tier
}
)
@auth_bp.route('/api/v1/auth/verify-email', methods=['GET'])
def api_verify_email():
"""
Verify a user's email address.
Query Parameters:
userId: User ID from verification link
secret: Verification secret from verification link
Returns:
Redirects to login page with success/error message
"""
try:
user_id = request.args.get('userId')
secret = request.args.get('secret')
if not user_id or not secret:
flash("Invalid verification link", "error")
return redirect(url_for('auth.login_page'))
# Verify email
appwrite = AppwriteService()
appwrite.verify_email(user_id=user_id, secret=secret)
logger.info("Email verified successfully", user_id=user_id)
flash("Email verified successfully! You can now log in.", "success")
return redirect(url_for('auth.login_page'))
except AppwriteException as e:
logger.error("Email verification failed", error=str(e), code=e.code)
flash("Email verification failed. The link may be invalid or expired.", "error")
return redirect(url_for('auth.login_page'))
except Exception as e:
logger.error("Unexpected error during email verification", error=str(e))
flash("An unexpected error occurred", "error")
return redirect(url_for('auth.login_page'))
@auth_bp.route('/api/v1/auth/forgot-password', methods=['POST'])
def api_forgot_password():
"""
Request a password reset email.
Request Body:
{
"email": "user@example.com"
}
Returns:
200: Always returns success (for security, don't reveal if email exists)
400: Validation error
500: Internal server error
"""
try:
# Get request data
data = request.get_json()
if not data:
return validation_error_response({"error": "Request body is required"})
email = data.get('email', '').strip().lower()
# Validate email
email_valid, email_error = validate_email(email)
if not email_valid:
return validation_error_response({"email": email_error})
# Request password reset
appwrite = AppwriteService()
appwrite.request_password_reset(email=email)
logger.info("Password reset requested", email=email)
# Always return success for security
return success_response(
result={
"message": "If an account exists with this email, you will receive a password reset link shortly."
}
)
except Exception as e:
logger.error("Unexpected error during password reset request", error=str(e))
# Still return success for security
return success_response(
result={
"message": "If an account exists with this email, you will receive a password reset link shortly."
}
)
@auth_bp.route('/api/v1/auth/reset-password', methods=['POST'])
def api_reset_password():
"""
Confirm password reset and update password.
Request Body:
{
"user_id": "user_id_from_link",
"secret": "secret_from_link",
"password": "NewSecurePass123!"
}
Returns:
200: Password reset successful
400: Validation error or invalid/expired link
500: Internal server error
"""
try:
# Get request data
data = request.get_json()
if not data:
return validation_error_response({"error": "Request body is required"})
user_id = data.get('user_id', '').strip()
secret = data.get('secret', '').strip()
password = data.get('password', '')
# Validate inputs
validation_errors = {}
if not user_id:
validation_errors['user_id'] = "User ID is required"
if not secret:
validation_errors['secret'] = "Reset secret is required"
password_valid, password_error = validate_password(password)
if not password_valid:
validation_errors['password'] = password_error
if validation_errors:
return validation_error_response(validation_errors)
# Confirm password reset
appwrite = AppwriteService()
appwrite.confirm_password_reset(user_id=user_id, secret=secret, password=password)
# Invalidate all cached sessions for this user (security: password changed)
cache = SessionCacheService()
cache.invalidate_user(user_id)
logger.info("Password reset successfully", user_id=user_id)
return success_response(
result={
"message": "Password reset successful. You can now log in with your new password."
}
)
except AppwriteException as e:
logger.error("Password reset failed", error=str(e), code=e.code)
return error_response(
message="Password reset failed. The link may be invalid or expired.",
code="PASSWORD_RESET_ERROR"
)
except Exception as e:
logger.error("Unexpected error during password reset", error=str(e))
return error_response(message="An unexpected error occurred", code="INTERNAL_ERROR")
# ===== TEMPLATE ROUTES (for rendering HTML pages) =====
@auth_bp.route('/login', methods=['GET'])
def login_page():
"""Render the login page."""
return render_template('auth/login.html')
@auth_bp.route('/register', methods=['GET'])
def register_page():
"""Render the registration page."""
return render_template('auth/register.html')
@auth_bp.route('/forgot-password', methods=['GET'])
def forgot_password_page():
"""Render the forgot password page."""
return render_template('auth/forgot_password.html')
@auth_bp.route('/reset-password', methods=['GET'])
def reset_password_page():
"""Render the reset password page."""
user_id = request.args.get('userId', '')
secret = request.args.get('secret', '')
return render_template('auth/reset_password.html', user_id=user_id, secret=secret)