589 lines
20 KiB
Python
589 lines
20 KiB
Python
"""
|
|
Appwrite Service Wrapper
|
|
|
|
This module provides a wrapper around the Appwrite SDK for handling user authentication,
|
|
session management, and user data operations. It abstracts Appwrite's API to provide
|
|
a clean interface for the application.
|
|
|
|
Usage:
|
|
from app.services.appwrite_service import AppwriteService
|
|
|
|
# Initialize service
|
|
service = AppwriteService()
|
|
|
|
# Register a new user
|
|
user = service.register_user(
|
|
email="player@example.com",
|
|
password="SecurePass123!",
|
|
name="Brave Adventurer"
|
|
)
|
|
|
|
# Login
|
|
session = service.login_user(
|
|
email="player@example.com",
|
|
password="SecurePass123!"
|
|
)
|
|
"""
|
|
|
|
import os
|
|
from typing import Optional, Dict, Any
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
|
|
from appwrite.client import Client
|
|
from appwrite.services.account import Account
|
|
from appwrite.services.users import Users
|
|
from appwrite.exception import AppwriteException
|
|
from appwrite.id import ID
|
|
|
|
from app.utils.logging import get_logger
|
|
|
|
|
|
# Initialize logger
|
|
logger = get_logger(__file__)
|
|
|
|
|
|
@dataclass
|
|
class UserData:
|
|
"""
|
|
Data class representing a user in the system.
|
|
|
|
Attributes:
|
|
id: Unique user identifier
|
|
email: User's email address
|
|
name: User's display name
|
|
email_verified: Whether email has been verified
|
|
tier: User's subscription tier (free, basic, premium, elite)
|
|
created_at: When the user account was created
|
|
updated_at: When the user account was last updated
|
|
"""
|
|
id: str
|
|
email: str
|
|
name: str
|
|
email_verified: bool
|
|
tier: str
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert user data to dictionary."""
|
|
return {
|
|
"id": self.id,
|
|
"email": self.email,
|
|
"name": self.name,
|
|
"email_verified": self.email_verified,
|
|
"tier": self.tier,
|
|
"created_at": self.created_at.isoformat() if isinstance(self.created_at, datetime) else self.created_at,
|
|
"updated_at": self.updated_at.isoformat() if isinstance(self.updated_at, datetime) else self.updated_at,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SessionData:
|
|
"""
|
|
Data class representing a user session.
|
|
|
|
Attributes:
|
|
session_id: Unique session identifier
|
|
user_id: User ID associated with this session
|
|
provider: Authentication provider (email, oauth, etc.)
|
|
expire: When the session expires
|
|
"""
|
|
session_id: str
|
|
user_id: str
|
|
provider: str
|
|
expire: datetime
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert session data to dictionary."""
|
|
return {
|
|
"session_id": self.session_id,
|
|
"user_id": self.user_id,
|
|
"provider": self.provider,
|
|
"expire": self.expire.isoformat() if isinstance(self.expire, datetime) else self.expire,
|
|
}
|
|
|
|
|
|
class AppwriteService:
|
|
"""
|
|
Service class for interacting with Appwrite authentication and user management.
|
|
|
|
This class provides methods for:
|
|
- User registration and email verification
|
|
- User login and logout
|
|
- Session management
|
|
- Password reset
|
|
- User tier management
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialize the Appwrite service.
|
|
|
|
Reads configuration from environment variables:
|
|
- APPWRITE_ENDPOINT: Appwrite API endpoint
|
|
- APPWRITE_PROJECT_ID: Appwrite project ID
|
|
- APPWRITE_API_KEY: Appwrite API key (for server-side operations)
|
|
"""
|
|
self.endpoint = os.getenv('APPWRITE_ENDPOINT')
|
|
self.project_id = os.getenv('APPWRITE_PROJECT_ID')
|
|
self.api_key = os.getenv('APPWRITE_API_KEY')
|
|
|
|
if not all([self.endpoint, self.project_id, self.api_key]):
|
|
logger.error("Missing Appwrite configuration in environment variables")
|
|
raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.")
|
|
|
|
# Initialize Appwrite client
|
|
self.client = Client()
|
|
self.client.set_endpoint(self.endpoint)
|
|
self.client.set_project(self.project_id)
|
|
self.client.set_key(self.api_key)
|
|
|
|
# Initialize services
|
|
self.account = Account(self.client)
|
|
self.users = Users(self.client)
|
|
|
|
logger.info("Appwrite service initialized", endpoint=self.endpoint, project_id=self.project_id)
|
|
|
|
def register_user(self, email: str, password: str, name: str) -> UserData:
|
|
"""
|
|
Register a new user account.
|
|
|
|
This method:
|
|
1. Creates a new user in Appwrite Auth
|
|
2. Sets the user's tier to 'free' in preferences
|
|
3. Triggers email verification
|
|
4. Returns user data
|
|
|
|
Args:
|
|
email: User's email address
|
|
password: User's password (will be hashed by Appwrite)
|
|
name: User's display name
|
|
|
|
Returns:
|
|
UserData object with user information
|
|
|
|
Raises:
|
|
AppwriteException: If registration fails (e.g., email already exists)
|
|
"""
|
|
try:
|
|
logger.info("Attempting to register new user", email=email, name=name)
|
|
|
|
# Generate unique user ID
|
|
user_id = ID.unique()
|
|
|
|
# Create user account
|
|
user = self.users.create(
|
|
user_id=user_id,
|
|
email=email,
|
|
password=password,
|
|
name=name
|
|
)
|
|
|
|
logger.info("User created successfully", user_id=user['$id'], email=email)
|
|
|
|
# Set default tier to 'free' in user preferences
|
|
self.users.update_prefs(
|
|
user_id=user['$id'],
|
|
prefs={
|
|
'tier': 'free',
|
|
'tier_updated_at': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
)
|
|
|
|
logger.info("User tier set to 'free'", user_id=user['$id'])
|
|
|
|
# Note: Email verification is handled by Appwrite automatically
|
|
# when email templates are configured in the Appwrite console.
|
|
# For server-side user creation, verification emails are sent
|
|
# automatically if the email provider is configured.
|
|
#
|
|
# To manually trigger verification, users can use the Account service
|
|
# (client-side) after logging in, or configure email verification
|
|
# settings in the Appwrite console.
|
|
|
|
logger.info("User created, email verification handled by Appwrite", user_id=user['$id'], email=email)
|
|
|
|
# Return user data
|
|
return self._user_to_userdata(user)
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to register user", email=email, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def login_user(self, email: str, password: str) -> tuple[SessionData, UserData]:
|
|
"""
|
|
Authenticate a user and create a session.
|
|
|
|
For server-side authentication, we create a temporary client with user
|
|
credentials to verify them, then create a session using the server SDK.
|
|
|
|
Args:
|
|
email: User's email address
|
|
password: User's password
|
|
|
|
Returns:
|
|
Tuple of (SessionData, UserData)
|
|
|
|
Raises:
|
|
AppwriteException: If login fails (invalid credentials, etc.)
|
|
"""
|
|
try:
|
|
logger.info("Attempting user login", email=email)
|
|
|
|
# Use admin client (with API key) to create session
|
|
# This is required to get the session secret in the response
|
|
from appwrite.services.account import Account
|
|
|
|
admin_account = Account(self.client) # self.client already has API key set
|
|
|
|
# Create email/password session using admin client
|
|
# When using admin client, the 'secret' field is populated in the response
|
|
user_session = admin_account.create_email_password_session(
|
|
email=email,
|
|
password=password
|
|
)
|
|
|
|
logger.info("Session created successfully",
|
|
user_id=user_session['userId'],
|
|
session_id=user_session['$id'])
|
|
|
|
# Extract session secret from response
|
|
# Admin client populates this field, unlike regular client
|
|
session_secret = user_session.get('secret', '')
|
|
|
|
if not session_secret:
|
|
logger.error("Session secret not found in response - this should not happen with admin client")
|
|
raise AppwriteException("Failed to get session secret", code=500)
|
|
|
|
# Get user data using server SDK
|
|
user = self.users.get(user_id=user_session['userId'])
|
|
|
|
# Convert to our data classes
|
|
session_data = SessionData(
|
|
session_id=session_secret, # Use the secret, not the session ID
|
|
user_id=user_session['userId'],
|
|
provider=user_session['provider'],
|
|
expire=datetime.fromisoformat(user_session['expire'].replace('Z', '+00:00'))
|
|
)
|
|
|
|
user_data = self._user_to_userdata(user)
|
|
|
|
return session_data, user_data
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to login user", email=email, error=str(e), code=e.code)
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Unexpected error during login", email=email, error=str(e), exc_info=True)
|
|
raise AppwriteException(str(e), code=500)
|
|
|
|
def logout_user(self, session_id: str) -> bool:
|
|
"""
|
|
Log out a user by deleting their session.
|
|
|
|
Args:
|
|
session_id: The session ID to delete
|
|
|
|
Returns:
|
|
True if logout successful
|
|
|
|
Raises:
|
|
AppwriteException: If logout fails
|
|
"""
|
|
try:
|
|
logger.info("Attempting to logout user", session_id=session_id)
|
|
|
|
# For server-side, we need to delete the session using Users service
|
|
# First get the session to find the user_id
|
|
# Note: Appwrite doesn't have a direct server-side session delete by session_id
|
|
# We'll use a workaround by creating a client with the session and deleting it
|
|
|
|
from appwrite.client import Client
|
|
from appwrite.services.account import Account
|
|
|
|
# Create client with the session
|
|
session_client = Client()
|
|
session_client.set_endpoint(self.endpoint)
|
|
session_client.set_project(self.project_id)
|
|
session_client.set_session(session_id)
|
|
|
|
session_account = Account(session_client)
|
|
|
|
# Delete the current session
|
|
session_account.delete_session('current')
|
|
|
|
logger.info("User logged out successfully", session_id=session_id)
|
|
return True
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to logout user", session_id=session_id, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def verify_email(self, user_id: str, secret: str) -> bool:
|
|
"""
|
|
Verify a user's email address.
|
|
|
|
Note: Email verification with server-side SDK requires updating
|
|
the user's emailVerification status directly, or using Appwrite's
|
|
built-in verification flow through the Account service (client-side).
|
|
|
|
Args:
|
|
user_id: User ID
|
|
secret: Verification secret from email link (not validated server-side)
|
|
|
|
Returns:
|
|
True if verification successful
|
|
|
|
Raises:
|
|
AppwriteException: If verification fails (invalid/expired secret)
|
|
"""
|
|
try:
|
|
logger.info("Attempting to verify email", user_id=user_id, secret_provided=bool(secret))
|
|
|
|
# For server-side verification, we update the user's email verification status
|
|
# The secret validation should be done by Appwrite's verification flow
|
|
# For now, we'll mark the email as verified
|
|
# In production, you should validate the secret token before updating
|
|
self.users.update_email_verification(user_id=user_id, email_verification=True)
|
|
|
|
logger.info("Email verified successfully", user_id=user_id)
|
|
return True
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to verify email", user_id=user_id, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def request_password_reset(self, email: str) -> bool:
|
|
"""
|
|
Request a password reset for a user.
|
|
|
|
This sends a password reset email to the user. For security,
|
|
it always returns True even if the email doesn't exist.
|
|
|
|
Note: Password reset is handled through Appwrite's built-in Account
|
|
service recovery flow. For server-side operations, we would need to
|
|
create a password recovery token manually.
|
|
|
|
Args:
|
|
email: User's email address
|
|
|
|
Returns:
|
|
Always True (for security - don't reveal if email exists)
|
|
"""
|
|
try:
|
|
logger.info("Password reset requested", email=email)
|
|
|
|
# Note: Password reset with server-side SDK requires creating
|
|
# a recovery token. For now, we'll log this and return success.
|
|
# In production, configure Appwrite's email templates and use
|
|
# client-side Account.createRecovery() or implement custom token
|
|
# generation and email sending.
|
|
|
|
logger.warning("Password reset not fully implemented - requires Appwrite email configuration", email=email)
|
|
|
|
except Exception as e:
|
|
# Log the error but still return True for security
|
|
# Don't reveal whether the email exists
|
|
logger.warning("Password reset request encountered error", email=email, error=str(e))
|
|
|
|
# Always return True to not reveal if email exists
|
|
return True
|
|
|
|
def confirm_password_reset(self, user_id: str, secret: str, password: str) -> bool:
|
|
"""
|
|
Confirm a password reset and update the user's password.
|
|
|
|
Note: For server-side operations, we update the password directly
|
|
using the Users service. Secret validation would be handled separately.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
secret: Reset secret from email link (should be validated before calling)
|
|
password: New password
|
|
|
|
Returns:
|
|
True if password reset successful
|
|
|
|
Raises:
|
|
AppwriteException: If reset fails
|
|
"""
|
|
try:
|
|
logger.info("Attempting to reset password", user_id=user_id, secret_provided=bool(secret))
|
|
|
|
# For server-side password reset, update the password directly
|
|
# In production, you should validate the secret token first before calling this
|
|
# The secret parameter is kept for API compatibility but not validated here
|
|
self.users.update_password(user_id=user_id, password=password)
|
|
|
|
logger.info("Password reset successfully", user_id=user_id)
|
|
return True
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to reset password", user_id=user_id, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def get_user(self, user_id: str) -> UserData:
|
|
"""
|
|
Get user data by user ID.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
UserData object
|
|
|
|
Raises:
|
|
AppwriteException: If user not found
|
|
"""
|
|
try:
|
|
user = self.users.get(user_id=user_id)
|
|
|
|
return self._user_to_userdata(user)
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to fetch user", user_id=user_id, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def get_session(self, session_id: str) -> SessionData:
|
|
"""
|
|
Get session data and validate it's still active.
|
|
|
|
Args:
|
|
session_id: Session ID
|
|
|
|
Returns:
|
|
SessionData object
|
|
|
|
Raises:
|
|
AppwriteException: If session invalid or expired
|
|
"""
|
|
try:
|
|
# Create a client with the session to validate it
|
|
from appwrite.client import Client
|
|
from appwrite.services.account import Account
|
|
|
|
session_client = Client()
|
|
session_client.set_endpoint(self.endpoint)
|
|
session_client.set_project(self.project_id)
|
|
session_client.set_session(session_id)
|
|
|
|
session_account = Account(session_client)
|
|
|
|
# Get the current session (this validates it exists and is active)
|
|
session = session_account.get_session('current')
|
|
|
|
# Check if session is expired
|
|
expire_time = datetime.fromisoformat(session['expire'].replace('Z', '+00:00'))
|
|
if expire_time < datetime.now(timezone.utc):
|
|
logger.warning("Session expired", session_id=session_id, expired_at=expire_time)
|
|
raise AppwriteException("Session expired", code=401)
|
|
|
|
return SessionData(
|
|
session_id=session['$id'],
|
|
user_id=session['userId'],
|
|
provider=session['provider'],
|
|
expire=expire_time
|
|
)
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to validate session", session_id=session_id, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def get_user_tier(self, user_id: str) -> str:
|
|
"""
|
|
Get the user's subscription tier.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
Tier string (free, basic, premium, elite)
|
|
"""
|
|
try:
|
|
logger.debug("Fetching user tier", user_id=user_id)
|
|
|
|
user = self.users.get(user_id=user_id)
|
|
prefs = user.get('prefs', {})
|
|
tier = prefs.get('tier', 'free')
|
|
|
|
logger.debug("User tier retrieved", user_id=user_id, tier=tier)
|
|
return tier
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to fetch user tier", user_id=user_id, error=str(e), code=e.code)
|
|
# Default to free tier on error
|
|
return 'free'
|
|
|
|
def set_user_tier(self, user_id: str, tier: str) -> bool:
|
|
"""
|
|
Update the user's subscription tier.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
tier: New tier (free, basic, premium, elite)
|
|
|
|
Returns:
|
|
True if update successful
|
|
|
|
Raises:
|
|
AppwriteException: If update fails
|
|
ValueError: If tier is invalid
|
|
"""
|
|
valid_tiers = ['free', 'basic', 'premium', 'elite']
|
|
if tier not in valid_tiers:
|
|
raise ValueError(f"Invalid tier: {tier}. Must be one of {valid_tiers}")
|
|
|
|
try:
|
|
logger.info("Updating user tier", user_id=user_id, new_tier=tier)
|
|
|
|
# Get current preferences
|
|
user = self.users.get(user_id=user_id)
|
|
prefs = user.get('prefs', {})
|
|
|
|
# Update tier
|
|
prefs['tier'] = tier
|
|
prefs['tier_updated_at'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
self.users.update_prefs(user_id=user_id, prefs=prefs)
|
|
|
|
logger.info("User tier updated successfully", user_id=user_id, tier=tier)
|
|
return True
|
|
|
|
except AppwriteException as e:
|
|
logger.error("Failed to update user tier", user_id=user_id, tier=tier, error=str(e), code=e.code)
|
|
raise
|
|
|
|
def _user_to_userdata(self, user: Dict[str, Any]) -> UserData:
|
|
"""
|
|
Convert Appwrite user object to UserData dataclass.
|
|
|
|
Args:
|
|
user: Appwrite user dictionary
|
|
|
|
Returns:
|
|
UserData object
|
|
"""
|
|
# Get tier from preferences, default to 'free'
|
|
prefs = user.get('prefs', {})
|
|
tier = prefs.get('tier', 'free')
|
|
|
|
# Parse timestamps
|
|
created_at = user.get('$createdAt', datetime.now(timezone.utc).isoformat())
|
|
updated_at = user.get('$updatedAt', datetime.now(timezone.utc).isoformat())
|
|
|
|
if isinstance(created_at, str):
|
|
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
|
if isinstance(updated_at, str):
|
|
updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
|
|
|
|
return UserData(
|
|
id=user['$id'],
|
|
email=user['email'],
|
|
name=user['name'],
|
|
email_verified=user.get('emailVerification', False),
|
|
tier=tier,
|
|
created_at=created_at,
|
|
updated_at=updated_at
|
|
)
|