""" Usage Tracking Service for AI cost and usage monitoring. This service tracks all AI usage events, calculates costs, and provides analytics for monitoring and rate limiting purposes. Usage: from app.services.usage_tracking_service import UsageTrackingService tracker = UsageTrackingService() # Log a usage event tracker.log_usage( user_id="user_123", model="anthropic/claude-3.5-sonnet", tokens_input=100, tokens_output=350, task_type=TaskType.STORY_PROGRESSION ) # Get daily usage usage = tracker.get_daily_usage("user_123", date.today()) print(f"Total requests: {usage.total_requests}") print(f"Estimated cost: ${usage.estimated_cost:.4f}") """ import os from datetime import datetime, timezone, date, timedelta from typing import Dict, Any, List, Optional from uuid import uuid4 from appwrite.client import Client from appwrite.services.tables_db import TablesDB from appwrite.exception import AppwriteException from appwrite.id import ID from appwrite.query import Query from app.utils.logging import get_logger from app.models.ai_usage import ( AIUsageLog, DailyUsageSummary, MonthlyUsageSummary, TaskType ) logger = get_logger(__file__) # Cost per 1000 tokens by model (in USD) # These are estimates based on Replicate pricing MODEL_COSTS = { # Llama models (via Replicate) - very cheap "meta/meta-llama-3-8b-instruct": { "input": 0.0001, # $0.0001 per 1K input tokens "output": 0.0001, # $0.0001 per 1K output tokens }, "meta/meta-llama-3-70b-instruct": { "input": 0.0006, "output": 0.0006, }, # Claude models (via Replicate) "anthropic/claude-3.5-haiku": { "input": 0.001, # $0.001 per 1K input tokens "output": 0.005, # $0.005 per 1K output tokens }, "anthropic/claude-3-haiku": { "input": 0.00025, "output": 0.00125, }, "anthropic/claude-3.5-sonnet": { "input": 0.003, # $0.003 per 1K input tokens "output": 0.015, # $0.015 per 1K output tokens }, "anthropic/claude-4.5-sonnet": { "input": 0.003, "output": 0.015, }, "anthropic/claude-3-opus": { "input": 0.015, # $0.015 per 1K input tokens "output": 0.075, # $0.075 per 1K output tokens }, } # Default cost for unknown models DEFAULT_COST = {"input": 0.001, "output": 0.005} class UsageTrackingService: """ Service for tracking AI usage and calculating costs. This service provides: - Logging individual AI usage events to Appwrite - Calculating estimated costs based on model pricing - Retrieving daily and monthly usage summaries - Analytics for monitoring and rate limiting The service stores usage logs in an Appwrite collection named 'ai_usage_logs'. """ # Collection ID for usage logs COLLECTION_ID = "ai_usage_logs" def __init__(self): """ Initialize the usage tracking service. Reads configuration from environment variables: - APPWRITE_ENDPOINT: Appwrite API endpoint - APPWRITE_PROJECT_ID: Appwrite project ID - APPWRITE_API_KEY: Appwrite API key - APPWRITE_DATABASE_ID: Appwrite database ID Raises: ValueError: If required environment variables are missing """ self.endpoint = os.getenv('APPWRITE_ENDPOINT') self.project_id = os.getenv('APPWRITE_PROJECT_ID') self.api_key = os.getenv('APPWRITE_API_KEY') self.database_id = os.getenv('APPWRITE_DATABASE_ID', 'main') if not all([self.endpoint, self.project_id, self.api_key]): logger.error("Missing Appwrite configuration in environment variables") raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.") # Initialize Appwrite client self.client = Client() self.client.set_endpoint(self.endpoint) self.client.set_project(self.project_id) self.client.set_key(self.api_key) # Initialize TablesDB service self.tables_db = TablesDB(self.client) logger.info("UsageTrackingService initialized", database_id=self.database_id) def log_usage( self, user_id: str, model: str, tokens_input: int, tokens_output: int, task_type: TaskType, session_id: Optional[str] = None, character_id: Optional[str] = None, request_duration_ms: int = 0, success: bool = True, error_message: Optional[str] = None ) -> AIUsageLog: """ Log an AI usage event. This method creates a new usage log entry in Appwrite with all relevant information about the AI request including calculated estimated cost. Args: user_id: User who made the request model: Model identifier (e.g., "anthropic/claude-3.5-sonnet") tokens_input: Number of input tokens (prompt) tokens_output: Number of output tokens (response) task_type: Type of task (story, combat, quest, npc) session_id: Optional game session ID character_id: Optional character ID request_duration_ms: Request duration in milliseconds success: Whether the request succeeded error_message: Error message if failed Returns: AIUsageLog with the logged data Raises: AppwriteException: If storage fails """ # Calculate total tokens tokens_total = tokens_input + tokens_output # Calculate estimated cost estimated_cost = self._calculate_cost(model, tokens_input, tokens_output) # Generate log ID log_id = str(uuid4()) # Create usage log usage_log = AIUsageLog( log_id=log_id, user_id=user_id, timestamp=datetime.now(timezone.utc), model=model, tokens_input=tokens_input, tokens_output=tokens_output, tokens_total=tokens_total, estimated_cost=estimated_cost, task_type=task_type, session_id=session_id, character_id=character_id, request_duration_ms=request_duration_ms, success=success, error_message=error_message, ) try: # Store in Appwrite result = self.tables_db.create_row( database_id=self.database_id, table_id=self.COLLECTION_ID, row_id=log_id, data=usage_log.to_dict() ) logger.info( "AI usage logged", log_id=log_id, user_id=user_id, model=model, tokens_total=tokens_total, estimated_cost=estimated_cost, task_type=task_type.value, success=success ) return usage_log except AppwriteException as e: logger.error( "Failed to log AI usage", user_id=user_id, model=model, error=str(e), code=e.code ) raise def get_daily_usage(self, user_id: str, target_date: date) -> DailyUsageSummary: """ Get AI usage summary for a specific day. Args: user_id: User ID to get usage for target_date: Date to get usage for Returns: DailyUsageSummary with aggregated usage data Raises: AppwriteException: If query fails """ try: # Build date range for the target day (UTC) start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc) end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc) # Query usage logs for this user and date result = self.tables_db.list_rows( database_id=self.database_id, table_id=self.COLLECTION_ID, queries=[ Query.equal("user_id", user_id), Query.greater_than_equal("timestamp", start_of_day.isoformat()), Query.less_than_equal("timestamp", end_of_day.isoformat()), Query.limit(1000) # Cap at 1000 entries per day ] ) # Aggregate the data total_requests = 0 total_tokens = 0 total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 requests_by_task: Dict[str, int] = {} for doc in result['rows']: total_requests += 1 total_tokens += doc.get('tokens_total', 0) total_input_tokens += doc.get('tokens_input', 0) total_output_tokens += doc.get('tokens_output', 0) total_cost += doc.get('estimated_cost', 0.0) task_type = doc.get('task_type', 'general') requests_by_task[task_type] = requests_by_task.get(task_type, 0) + 1 summary = DailyUsageSummary( date=target_date, user_id=user_id, total_requests=total_requests, total_tokens=total_tokens, total_input_tokens=total_input_tokens, total_output_tokens=total_output_tokens, estimated_cost=total_cost, requests_by_task=requests_by_task ) logger.debug( "Daily usage retrieved", user_id=user_id, date=target_date.isoformat(), total_requests=total_requests, estimated_cost=total_cost ) return summary except AppwriteException as e: logger.error( "Failed to get daily usage", user_id=user_id, date=target_date.isoformat(), error=str(e), code=e.code ) raise def get_monthly_cost(self, user_id: str, year: int, month: int) -> MonthlyUsageSummary: """ Get AI usage cost summary for a specific month. Args: user_id: User ID to get cost for year: Year (e.g., 2025) month: Month (1-12) Returns: MonthlyUsageSummary with aggregated cost data Raises: AppwriteException: If query fails ValueError: If month is invalid """ if not 1 <= month <= 12: raise ValueError(f"Invalid month: {month}. Must be 1-12.") try: # Build date range for the month start_of_month = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc) # Calculate end of month if month == 12: end_of_month = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1) else: end_of_month = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1) # Query usage logs for this user and month result = self.tables_db.list_rows( database_id=self.database_id, table_id=self.COLLECTION_ID, queries=[ Query.equal("user_id", user_id), Query.greater_than_equal("timestamp", start_of_month.isoformat()), Query.less_than_equal("timestamp", end_of_month.isoformat()), Query.limit(5000) # Cap at 5000 entries per month ] ) # Aggregate the data total_requests = 0 total_tokens = 0 total_cost = 0.0 for doc in result['rows']: total_requests += 1 total_tokens += doc.get('tokens_total', 0) total_cost += doc.get('estimated_cost', 0.0) summary = MonthlyUsageSummary( year=year, month=month, user_id=user_id, total_requests=total_requests, total_tokens=total_tokens, estimated_cost=total_cost ) logger.debug( "Monthly cost retrieved", user_id=user_id, year=year, month=month, total_requests=total_requests, estimated_cost=total_cost ) return summary except AppwriteException as e: logger.error( "Failed to get monthly cost", user_id=user_id, year=year, month=month, error=str(e), code=e.code ) raise def get_total_daily_cost(self, target_date: date) -> float: """ Get the total AI cost across all users for a specific day. Used for admin monitoring and alerting. Args: target_date: Date to get cost for Returns: Total estimated cost in USD Raises: AppwriteException: If query fails """ try: # Build date range for the target day start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc) end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc) # Query all usage logs for this date result = self.tables_db.list_rows( database_id=self.database_id, table_id=self.COLLECTION_ID, queries=[ Query.greater_than_equal("timestamp", start_of_day.isoformat()), Query.less_than_equal("timestamp", end_of_day.isoformat()), Query.limit(10000) ] ) # Sum up costs total_cost = sum(doc.get('estimated_cost', 0.0) for doc in result['rows']) logger.debug( "Total daily cost retrieved", date=target_date.isoformat(), total_cost=total_cost, total_documents=len(result['rows']) ) return total_cost except AppwriteException as e: logger.error( "Failed to get total daily cost", date=target_date.isoformat(), error=str(e), code=e.code ) raise def get_user_request_count_today(self, user_id: str) -> int: """ Get the number of AI requests a user has made today. Used for rate limiting checks. Args: user_id: User ID to check Returns: Number of requests made today Raises: AppwriteException: If query fails """ try: summary = self.get_daily_usage(user_id, date.today()) return summary.total_requests except AppwriteException: # If there's an error, return 0 to be safe (fail open) logger.warning( "Failed to get user request count, returning 0", user_id=user_id ) return 0 def _calculate_cost(self, model: str, tokens_input: int, tokens_output: int) -> float: """ Calculate the estimated cost for an AI request. Args: model: Model identifier tokens_input: Number of input tokens tokens_output: Number of output tokens Returns: Estimated cost in USD """ # Get cost per 1K tokens for this model model_cost = MODEL_COSTS.get(model, DEFAULT_COST) # Calculate cost (costs are per 1K tokens) input_cost = (tokens_input / 1000) * model_cost["input"] output_cost = (tokens_output / 1000) * model_cost["output"] total_cost = input_cost + output_cost return round(total_cost, 6) # Round to 6 decimal places @staticmethod def estimate_cost_for_model(model: str, tokens_input: int, tokens_output: int) -> float: """ Static method to estimate cost without needing a service instance. Useful for pre-calculation and UI display. Args: model: Model identifier tokens_input: Number of input tokens tokens_output: Number of output tokens Returns: Estimated cost in USD """ model_cost = MODEL_COSTS.get(model, DEFAULT_COST) input_cost = (tokens_input / 1000) * model_cost["input"] output_cost = (tokens_output / 1000) * model_cost["output"] return round(input_cost + output_cost, 6) @staticmethod def get_model_cost_info(model: str) -> Dict[str, float]: """ Get cost information for a model. Args: model: Model identifier Returns: Dictionary with 'input' and 'output' cost per 1K tokens """ return MODEL_COSTS.get(model, DEFAULT_COST)