""" Appwrite Service Wrapper This module provides a wrapper around the Appwrite SDK for handling user authentication, session management, and user data operations. It abstracts Appwrite's API to provide a clean interface for the application. Usage: from app.services.appwrite_service import AppwriteService # Initialize service service = AppwriteService() # Register a new user user = service.register_user( email="player@example.com", password="SecurePass123!", name="Brave Adventurer" ) # Login session = service.login_user( email="player@example.com", password="SecurePass123!" ) """ import os from typing import Optional, Dict, Any from dataclasses import dataclass from datetime import datetime, timezone from appwrite.client import Client from appwrite.services.account import Account from appwrite.services.users import Users from appwrite.exception import AppwriteException from appwrite.id import ID from app.utils.logging import get_logger # Initialize logger logger = get_logger(__file__) @dataclass class UserData: """ Data class representing a user in the system. Attributes: id: Unique user identifier email: User's email address name: User's display name email_verified: Whether email has been verified tier: User's subscription tier (free, basic, premium, elite) created_at: When the user account was created updated_at: When the user account was last updated """ id: str email: str name: str email_verified: bool tier: str created_at: datetime updated_at: datetime def to_dict(self) -> Dict[str, Any]: """Convert user data to dictionary.""" return { "id": self.id, "email": self.email, "name": self.name, "email_verified": self.email_verified, "tier": self.tier, "created_at": self.created_at.isoformat() if isinstance(self.created_at, datetime) else self.created_at, "updated_at": self.updated_at.isoformat() if isinstance(self.updated_at, datetime) else self.updated_at, } @dataclass class SessionData: """ Data class representing a user session. Attributes: session_id: Unique session identifier user_id: User ID associated with this session provider: Authentication provider (email, oauth, etc.) expire: When the session expires """ session_id: str user_id: str provider: str expire: datetime def to_dict(self) -> Dict[str, Any]: """Convert session data to dictionary.""" return { "session_id": self.session_id, "user_id": self.user_id, "provider": self.provider, "expire": self.expire.isoformat() if isinstance(self.expire, datetime) else self.expire, } class AppwriteService: """ Service class for interacting with Appwrite authentication and user management. This class provides methods for: - User registration and email verification - User login and logout - Session management - Password reset - User tier management """ def __init__(self): """ Initialize the Appwrite service. Reads configuration from environment variables: - APPWRITE_ENDPOINT: Appwrite API endpoint - APPWRITE_PROJECT_ID: Appwrite project ID - APPWRITE_API_KEY: Appwrite API key (for server-side operations) """ self.endpoint = os.getenv('APPWRITE_ENDPOINT') self.project_id = os.getenv('APPWRITE_PROJECT_ID') self.api_key = os.getenv('APPWRITE_API_KEY') if not all([self.endpoint, self.project_id, self.api_key]): logger.error("Missing Appwrite configuration in environment variables") raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.") # Initialize Appwrite client self.client = Client() self.client.set_endpoint(self.endpoint) self.client.set_project(self.project_id) self.client.set_key(self.api_key) # Initialize services self.account = Account(self.client) self.users = Users(self.client) logger.info("Appwrite service initialized", endpoint=self.endpoint, project_id=self.project_id) def register_user(self, email: str, password: str, name: str) -> UserData: """ Register a new user account. This method: 1. Creates a new user in Appwrite Auth 2. Sets the user's tier to 'free' in preferences 3. Triggers email verification 4. Returns user data Args: email: User's email address password: User's password (will be hashed by Appwrite) name: User's display name Returns: UserData object with user information Raises: AppwriteException: If registration fails (e.g., email already exists) """ try: logger.info("Attempting to register new user", email=email, name=name) # Generate unique user ID user_id = ID.unique() # Create user account user = self.users.create( user_id=user_id, email=email, password=password, name=name ) logger.info("User created successfully", user_id=user['$id'], email=email) # Set default tier to 'free' in user preferences self.users.update_prefs( user_id=user['$id'], prefs={ 'tier': 'free', 'tier_updated_at': datetime.now(timezone.utc).isoformat() } ) logger.info("User tier set to 'free'", user_id=user['$id']) # Note: Email verification is handled by Appwrite automatically # when email templates are configured in the Appwrite console. # For server-side user creation, verification emails are sent # automatically if the email provider is configured. # # To manually trigger verification, users can use the Account service # (client-side) after logging in, or configure email verification # settings in the Appwrite console. logger.info("User created, email verification handled by Appwrite", user_id=user['$id'], email=email) # Return user data return self._user_to_userdata(user) except AppwriteException as e: logger.error("Failed to register user", email=email, error=str(e), code=e.code) raise def login_user(self, email: str, password: str) -> tuple[SessionData, UserData]: """ Authenticate a user and create a session. For server-side authentication, we create a temporary client with user credentials to verify them, then create a session using the server SDK. Args: email: User's email address password: User's password Returns: Tuple of (SessionData, UserData) Raises: AppwriteException: If login fails (invalid credentials, etc.) """ try: logger.info("Attempting user login", email=email) # Use admin client (with API key) to create session # This is required to get the session secret in the response from appwrite.services.account import Account admin_account = Account(self.client) # self.client already has API key set # Create email/password session using admin client # When using admin client, the 'secret' field is populated in the response user_session = admin_account.create_email_password_session( email=email, password=password ) logger.info("Session created successfully", user_id=user_session['userId'], session_id=user_session['$id']) # Extract session secret from response # Admin client populates this field, unlike regular client session_secret = user_session.get('secret', '') if not session_secret: logger.error("Session secret not found in response - this should not happen with admin client") raise AppwriteException("Failed to get session secret", code=500) # Get user data using server SDK user = self.users.get(user_id=user_session['userId']) # Convert to our data classes session_data = SessionData( session_id=session_secret, # Use the secret, not the session ID user_id=user_session['userId'], provider=user_session['provider'], expire=datetime.fromisoformat(user_session['expire'].replace('Z', '+00:00')) ) user_data = self._user_to_userdata(user) return session_data, user_data except AppwriteException as e: logger.error("Failed to login user", email=email, error=str(e), code=e.code) raise except Exception as e: logger.error("Unexpected error during login", email=email, error=str(e), exc_info=True) raise AppwriteException(str(e), code=500) def logout_user(self, session_id: str) -> bool: """ Log out a user by deleting their session. Args: session_id: The session ID to delete Returns: True if logout successful Raises: AppwriteException: If logout fails """ try: logger.info("Attempting to logout user", session_id=session_id) # For server-side, we need to delete the session using Users service # First get the session to find the user_id # Note: Appwrite doesn't have a direct server-side session delete by session_id # We'll use a workaround by creating a client with the session and deleting it from appwrite.client import Client from appwrite.services.account import Account # Create client with the session session_client = Client() session_client.set_endpoint(self.endpoint) session_client.set_project(self.project_id) session_client.set_session(session_id) session_account = Account(session_client) # Delete the current session session_account.delete_session('current') logger.info("User logged out successfully", session_id=session_id) return True except AppwriteException as e: logger.error("Failed to logout user", session_id=session_id, error=str(e), code=e.code) raise def verify_email(self, user_id: str, secret: str) -> bool: """ Verify a user's email address. Note: Email verification with server-side SDK requires updating the user's emailVerification status directly, or using Appwrite's built-in verification flow through the Account service (client-side). Args: user_id: User ID secret: Verification secret from email link (not validated server-side) Returns: True if verification successful Raises: AppwriteException: If verification fails (invalid/expired secret) """ try: logger.info("Attempting to verify email", user_id=user_id, secret_provided=bool(secret)) # For server-side verification, we update the user's email verification status # The secret validation should be done by Appwrite's verification flow # For now, we'll mark the email as verified # In production, you should validate the secret token before updating self.users.update_email_verification(user_id=user_id, email_verification=True) logger.info("Email verified successfully", user_id=user_id) return True except AppwriteException as e: logger.error("Failed to verify email", user_id=user_id, error=str(e), code=e.code) raise def request_password_reset(self, email: str) -> bool: """ Request a password reset for a user. This sends a password reset email to the user. For security, it always returns True even if the email doesn't exist. Note: Password reset is handled through Appwrite's built-in Account service recovery flow. For server-side operations, we would need to create a password recovery token manually. Args: email: User's email address Returns: Always True (for security - don't reveal if email exists) """ try: logger.info("Password reset requested", email=email) # Note: Password reset with server-side SDK requires creating # a recovery token. For now, we'll log this and return success. # In production, configure Appwrite's email templates and use # client-side Account.createRecovery() or implement custom token # generation and email sending. logger.warning("Password reset not fully implemented - requires Appwrite email configuration", email=email) except Exception as e: # Log the error but still return True for security # Don't reveal whether the email exists logger.warning("Password reset request encountered error", email=email, error=str(e)) # Always return True to not reveal if email exists return True def confirm_password_reset(self, user_id: str, secret: str, password: str) -> bool: """ Confirm a password reset and update the user's password. Note: For server-side operations, we update the password directly using the Users service. Secret validation would be handled separately. Args: user_id: User ID secret: Reset secret from email link (should be validated before calling) password: New password Returns: True if password reset successful Raises: AppwriteException: If reset fails """ try: logger.info("Attempting to reset password", user_id=user_id, secret_provided=bool(secret)) # For server-side password reset, update the password directly # In production, you should validate the secret token first before calling this # The secret parameter is kept for API compatibility but not validated here self.users.update_password(user_id=user_id, password=password) logger.info("Password reset successfully", user_id=user_id) return True except AppwriteException as e: logger.error("Failed to reset password", user_id=user_id, error=str(e), code=e.code) raise def get_user(self, user_id: str) -> UserData: """ Get user data by user ID. Args: user_id: User ID Returns: UserData object Raises: AppwriteException: If user not found """ try: user = self.users.get(user_id=user_id) return self._user_to_userdata(user) except AppwriteException as e: logger.error("Failed to fetch user", user_id=user_id, error=str(e), code=e.code) raise def get_session(self, session_id: str) -> SessionData: """ Get session data and validate it's still active. Args: session_id: Session ID Returns: SessionData object Raises: AppwriteException: If session invalid or expired """ try: # Create a client with the session to validate it from appwrite.client import Client from appwrite.services.account import Account session_client = Client() session_client.set_endpoint(self.endpoint) session_client.set_project(self.project_id) session_client.set_session(session_id) session_account = Account(session_client) # Get the current session (this validates it exists and is active) session = session_account.get_session('current') # Check if session is expired expire_time = datetime.fromisoformat(session['expire'].replace('Z', '+00:00')) if expire_time < datetime.now(timezone.utc): logger.warning("Session expired", session_id=session_id, expired_at=expire_time) raise AppwriteException("Session expired", code=401) return SessionData( session_id=session['$id'], user_id=session['userId'], provider=session['provider'], expire=expire_time ) except AppwriteException as e: logger.error("Failed to validate session", session_id=session_id, error=str(e), code=e.code) raise def get_user_tier(self, user_id: str) -> str: """ Get the user's subscription tier. Args: user_id: User ID Returns: Tier string (free, basic, premium, elite) """ try: logger.debug("Fetching user tier", user_id=user_id) user = self.users.get(user_id=user_id) prefs = user.get('prefs', {}) tier = prefs.get('tier', 'free') logger.debug("User tier retrieved", user_id=user_id, tier=tier) return tier except AppwriteException as e: logger.error("Failed to fetch user tier", user_id=user_id, error=str(e), code=e.code) # Default to free tier on error return 'free' def set_user_tier(self, user_id: str, tier: str) -> bool: """ Update the user's subscription tier. Args: user_id: User ID tier: New tier (free, basic, premium, elite) Returns: True if update successful Raises: AppwriteException: If update fails ValueError: If tier is invalid """ valid_tiers = ['free', 'basic', 'premium', 'elite'] if tier not in valid_tiers: raise ValueError(f"Invalid tier: {tier}. Must be one of {valid_tiers}") try: logger.info("Updating user tier", user_id=user_id, new_tier=tier) # Get current preferences user = self.users.get(user_id=user_id) prefs = user.get('prefs', {}) # Update tier prefs['tier'] = tier prefs['tier_updated_at'] = datetime.now(timezone.utc).isoformat() self.users.update_prefs(user_id=user_id, prefs=prefs) logger.info("User tier updated successfully", user_id=user_id, tier=tier) return True except AppwriteException as e: logger.error("Failed to update user tier", user_id=user_id, tier=tier, error=str(e), code=e.code) raise def _user_to_userdata(self, user: Dict[str, Any]) -> UserData: """ Convert Appwrite user object to UserData dataclass. Args: user: Appwrite user dictionary Returns: UserData object """ # Get tier from preferences, default to 'free' prefs = user.get('prefs', {}) tier = prefs.get('tier', 'free') # Parse timestamps created_at = user.get('$createdAt', datetime.now(timezone.utc).isoformat()) updated_at = user.get('$updatedAt', datetime.now(timezone.utc).isoformat()) if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) return UserData( id=user['$id'], email=user['email'], name=user['name'], email_verified=user.get('emailVerification', False), tier=tier, created_at=created_at, updated_at=updated_at )