Files
Code_of_Conquest/api/app/services/usage_tracking_service.py
2025-11-24 23:10:55 -06:00

529 lines
17 KiB
Python

"""
Usage Tracking Service for AI cost and usage monitoring.
This service tracks all AI usage events, calculates costs, and provides
analytics for monitoring and rate limiting purposes.
Usage:
from app.services.usage_tracking_service import UsageTrackingService
tracker = UsageTrackingService()
# Log a usage event
tracker.log_usage(
user_id="user_123",
model="anthropic/claude-3.5-sonnet",
tokens_input=100,
tokens_output=350,
task_type=TaskType.STORY_PROGRESSION
)
# Get daily usage
usage = tracker.get_daily_usage("user_123", date.today())
print(f"Total requests: {usage.total_requests}")
print(f"Estimated cost: ${usage.estimated_cost:.4f}")
"""
import os
from datetime import datetime, timezone, date, timedelta
from typing import Dict, Any, List, Optional
from uuid import uuid4
from appwrite.client import Client
from appwrite.services.tables_db import TablesDB
from appwrite.exception import AppwriteException
from appwrite.id import ID
from appwrite.query import Query
from app.utils.logging import get_logger
from app.models.ai_usage import (
AIUsageLog,
DailyUsageSummary,
MonthlyUsageSummary,
TaskType
)
logger = get_logger(__file__)
# Cost per 1000 tokens by model (in USD)
# These are estimates based on Replicate pricing
MODEL_COSTS = {
# Llama models (via Replicate) - very cheap
"meta/meta-llama-3-8b-instruct": {
"input": 0.0001, # $0.0001 per 1K input tokens
"output": 0.0001, # $0.0001 per 1K output tokens
},
"meta/meta-llama-3-70b-instruct": {
"input": 0.0006,
"output": 0.0006,
},
# Claude models (via Replicate)
"anthropic/claude-3.5-haiku": {
"input": 0.001, # $0.001 per 1K input tokens
"output": 0.005, # $0.005 per 1K output tokens
},
"anthropic/claude-3-haiku": {
"input": 0.00025,
"output": 0.00125,
},
"anthropic/claude-3.5-sonnet": {
"input": 0.003, # $0.003 per 1K input tokens
"output": 0.015, # $0.015 per 1K output tokens
},
"anthropic/claude-4.5-sonnet": {
"input": 0.003,
"output": 0.015,
},
"anthropic/claude-3-opus": {
"input": 0.015, # $0.015 per 1K input tokens
"output": 0.075, # $0.075 per 1K output tokens
},
}
# Default cost for unknown models
DEFAULT_COST = {"input": 0.001, "output": 0.005}
class UsageTrackingService:
"""
Service for tracking AI usage and calculating costs.
This service provides:
- Logging individual AI usage events to Appwrite
- Calculating estimated costs based on model pricing
- Retrieving daily and monthly usage summaries
- Analytics for monitoring and rate limiting
The service stores usage logs in an Appwrite collection named 'ai_usage_logs'.
"""
# Collection ID for usage logs
COLLECTION_ID = "ai_usage_logs"
def __init__(self):
"""
Initialize the usage tracking service.
Reads configuration from environment variables:
- APPWRITE_ENDPOINT: Appwrite API endpoint
- APPWRITE_PROJECT_ID: Appwrite project ID
- APPWRITE_API_KEY: Appwrite API key
- APPWRITE_DATABASE_ID: Appwrite database ID
Raises:
ValueError: If required environment variables are missing
"""
self.endpoint = os.getenv('APPWRITE_ENDPOINT')
self.project_id = os.getenv('APPWRITE_PROJECT_ID')
self.api_key = os.getenv('APPWRITE_API_KEY')
self.database_id = os.getenv('APPWRITE_DATABASE_ID', 'main')
if not all([self.endpoint, self.project_id, self.api_key]):
logger.error("Missing Appwrite configuration in environment variables")
raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.")
# Initialize Appwrite client
self.client = Client()
self.client.set_endpoint(self.endpoint)
self.client.set_project(self.project_id)
self.client.set_key(self.api_key)
# Initialize TablesDB service
self.tables_db = TablesDB(self.client)
logger.info("UsageTrackingService initialized", database_id=self.database_id)
def log_usage(
self,
user_id: str,
model: str,
tokens_input: int,
tokens_output: int,
task_type: TaskType,
session_id: Optional[str] = None,
character_id: Optional[str] = None,
request_duration_ms: int = 0,
success: bool = True,
error_message: Optional[str] = None
) -> AIUsageLog:
"""
Log an AI usage event.
This method creates a new usage log entry in Appwrite with all
relevant information about the AI request including calculated
estimated cost.
Args:
user_id: User who made the request
model: Model identifier (e.g., "anthropic/claude-3.5-sonnet")
tokens_input: Number of input tokens (prompt)
tokens_output: Number of output tokens (response)
task_type: Type of task (story, combat, quest, npc)
session_id: Optional game session ID
character_id: Optional character ID
request_duration_ms: Request duration in milliseconds
success: Whether the request succeeded
error_message: Error message if failed
Returns:
AIUsageLog with the logged data
Raises:
AppwriteException: If storage fails
"""
# Calculate total tokens
tokens_total = tokens_input + tokens_output
# Calculate estimated cost
estimated_cost = self._calculate_cost(model, tokens_input, tokens_output)
# Generate log ID
log_id = str(uuid4())
# Create usage log
usage_log = AIUsageLog(
log_id=log_id,
user_id=user_id,
timestamp=datetime.now(timezone.utc),
model=model,
tokens_input=tokens_input,
tokens_output=tokens_output,
tokens_total=tokens_total,
estimated_cost=estimated_cost,
task_type=task_type,
session_id=session_id,
character_id=character_id,
request_duration_ms=request_duration_ms,
success=success,
error_message=error_message,
)
try:
# Store in Appwrite
result = self.tables_db.create_row(
database_id=self.database_id,
table_id=self.COLLECTION_ID,
row_id=log_id,
data=usage_log.to_dict()
)
logger.info(
"AI usage logged",
log_id=log_id,
user_id=user_id,
model=model,
tokens_total=tokens_total,
estimated_cost=estimated_cost,
task_type=task_type.value,
success=success
)
return usage_log
except AppwriteException as e:
logger.error(
"Failed to log AI usage",
user_id=user_id,
model=model,
error=str(e),
code=e.code
)
raise
def get_daily_usage(self, user_id: str, target_date: date) -> DailyUsageSummary:
"""
Get AI usage summary for a specific day.
Args:
user_id: User ID to get usage for
target_date: Date to get usage for
Returns:
DailyUsageSummary with aggregated usage data
Raises:
AppwriteException: If query fails
"""
try:
# Build date range for the target day (UTC)
start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc)
end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc)
# Query usage logs for this user and date
result = self.tables_db.list_rows(
database_id=self.database_id,
table_id=self.COLLECTION_ID,
queries=[
Query.equal("user_id", user_id),
Query.greater_than_equal("timestamp", start_of_day.isoformat()),
Query.less_than_equal("timestamp", end_of_day.isoformat()),
Query.limit(1000) # Cap at 1000 entries per day
]
)
# Aggregate the data
total_requests = 0
total_tokens = 0
total_input_tokens = 0
total_output_tokens = 0
total_cost = 0.0
requests_by_task: Dict[str, int] = {}
for doc in result['rows']:
total_requests += 1
total_tokens += doc.get('tokens_total', 0)
total_input_tokens += doc.get('tokens_input', 0)
total_output_tokens += doc.get('tokens_output', 0)
total_cost += doc.get('estimated_cost', 0.0)
task_type = doc.get('task_type', 'general')
requests_by_task[task_type] = requests_by_task.get(task_type, 0) + 1
summary = DailyUsageSummary(
date=target_date,
user_id=user_id,
total_requests=total_requests,
total_tokens=total_tokens,
total_input_tokens=total_input_tokens,
total_output_tokens=total_output_tokens,
estimated_cost=total_cost,
requests_by_task=requests_by_task
)
logger.debug(
"Daily usage retrieved",
user_id=user_id,
date=target_date.isoformat(),
total_requests=total_requests,
estimated_cost=total_cost
)
return summary
except AppwriteException as e:
logger.error(
"Failed to get daily usage",
user_id=user_id,
date=target_date.isoformat(),
error=str(e),
code=e.code
)
raise
def get_monthly_cost(self, user_id: str, year: int, month: int) -> MonthlyUsageSummary:
"""
Get AI usage cost summary for a specific month.
Args:
user_id: User ID to get cost for
year: Year (e.g., 2025)
month: Month (1-12)
Returns:
MonthlyUsageSummary with aggregated cost data
Raises:
AppwriteException: If query fails
ValueError: If month is invalid
"""
if not 1 <= month <= 12:
raise ValueError(f"Invalid month: {month}. Must be 1-12.")
try:
# Build date range for the month
start_of_month = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
# Calculate end of month
if month == 12:
end_of_month = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1)
else:
end_of_month = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1)
# Query usage logs for this user and month
result = self.tables_db.list_rows(
database_id=self.database_id,
table_id=self.COLLECTION_ID,
queries=[
Query.equal("user_id", user_id),
Query.greater_than_equal("timestamp", start_of_month.isoformat()),
Query.less_than_equal("timestamp", end_of_month.isoformat()),
Query.limit(5000) # Cap at 5000 entries per month
]
)
# Aggregate the data
total_requests = 0
total_tokens = 0
total_cost = 0.0
for doc in result['rows']:
total_requests += 1
total_tokens += doc.get('tokens_total', 0)
total_cost += doc.get('estimated_cost', 0.0)
summary = MonthlyUsageSummary(
year=year,
month=month,
user_id=user_id,
total_requests=total_requests,
total_tokens=total_tokens,
estimated_cost=total_cost
)
logger.debug(
"Monthly cost retrieved",
user_id=user_id,
year=year,
month=month,
total_requests=total_requests,
estimated_cost=total_cost
)
return summary
except AppwriteException as e:
logger.error(
"Failed to get monthly cost",
user_id=user_id,
year=year,
month=month,
error=str(e),
code=e.code
)
raise
def get_total_daily_cost(self, target_date: date) -> float:
"""
Get the total AI cost across all users for a specific day.
Used for admin monitoring and alerting.
Args:
target_date: Date to get cost for
Returns:
Total estimated cost in USD
Raises:
AppwriteException: If query fails
"""
try:
# Build date range for the target day
start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc)
end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc)
# Query all usage logs for this date
result = self.tables_db.list_rows(
database_id=self.database_id,
table_id=self.COLLECTION_ID,
queries=[
Query.greater_than_equal("timestamp", start_of_day.isoformat()),
Query.less_than_equal("timestamp", end_of_day.isoformat()),
Query.limit(10000)
]
)
# Sum up costs
total_cost = sum(doc.get('estimated_cost', 0.0) for doc in result['rows'])
logger.debug(
"Total daily cost retrieved",
date=target_date.isoformat(),
total_cost=total_cost,
total_documents=len(result['rows'])
)
return total_cost
except AppwriteException as e:
logger.error(
"Failed to get total daily cost",
date=target_date.isoformat(),
error=str(e),
code=e.code
)
raise
def get_user_request_count_today(self, user_id: str) -> int:
"""
Get the number of AI requests a user has made today.
Used for rate limiting checks.
Args:
user_id: User ID to check
Returns:
Number of requests made today
Raises:
AppwriteException: If query fails
"""
try:
summary = self.get_daily_usage(user_id, date.today())
return summary.total_requests
except AppwriteException:
# If there's an error, return 0 to be safe (fail open)
logger.warning(
"Failed to get user request count, returning 0",
user_id=user_id
)
return 0
def _calculate_cost(self, model: str, tokens_input: int, tokens_output: int) -> float:
"""
Calculate the estimated cost for an AI request.
Args:
model: Model identifier
tokens_input: Number of input tokens
tokens_output: Number of output tokens
Returns:
Estimated cost in USD
"""
# Get cost per 1K tokens for this model
model_cost = MODEL_COSTS.get(model, DEFAULT_COST)
# Calculate cost (costs are per 1K tokens)
input_cost = (tokens_input / 1000) * model_cost["input"]
output_cost = (tokens_output / 1000) * model_cost["output"]
total_cost = input_cost + output_cost
return round(total_cost, 6) # Round to 6 decimal places
@staticmethod
def estimate_cost_for_model(model: str, tokens_input: int, tokens_output: int) -> float:
"""
Static method to estimate cost without needing a service instance.
Useful for pre-calculation and UI display.
Args:
model: Model identifier
tokens_input: Number of input tokens
tokens_output: Number of output tokens
Returns:
Estimated cost in USD
"""
model_cost = MODEL_COSTS.get(model, DEFAULT_COST)
input_cost = (tokens_input / 1000) * model_cost["input"]
output_cost = (tokens_output / 1000) * model_cost["output"]
return round(input_cost + output_cost, 6)
@staticmethod
def get_model_cost_info(model: str) -> Dict[str, float]:
"""
Get cost information for a model.
Args:
model: Model identifier
Returns:
Dictionary with 'input' and 'output' cost per 1K tokens
"""
return MODEL_COSTS.get(model, DEFAULT_COST)