529 lines
17 KiB
Python
529 lines
17 KiB
Python
"""
|
|
Usage Tracking Service for AI cost and usage monitoring.
|
|
|
|
This service tracks all AI usage events, calculates costs, and provides
|
|
analytics for monitoring and rate limiting purposes.
|
|
|
|
Usage:
|
|
from app.services.usage_tracking_service import UsageTrackingService
|
|
|
|
tracker = UsageTrackingService()
|
|
|
|
# Log a usage event
|
|
tracker.log_usage(
|
|
user_id="user_123",
|
|
model="anthropic/claude-3.5-sonnet",
|
|
tokens_input=100,
|
|
tokens_output=350,
|
|
task_type=TaskType.STORY_PROGRESSION
|
|
)
|
|
|
|
# Get daily usage
|
|
usage = tracker.get_daily_usage("user_123", date.today())
|
|
print(f"Total requests: {usage.total_requests}")
|
|
print(f"Estimated cost: ${usage.estimated_cost:.4f}")
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime, timezone, date, timedelta
|
|
from typing import Dict, Any, List, Optional
|
|
from uuid import uuid4
|
|
|
|
from appwrite.client import Client
|
|
from appwrite.services.tables_db import TablesDB
|
|
from appwrite.exception import AppwriteException
|
|
from appwrite.id import ID
|
|
from appwrite.query import Query
|
|
|
|
from app.utils.logging import get_logger
|
|
from app.models.ai_usage import (
|
|
AIUsageLog,
|
|
DailyUsageSummary,
|
|
MonthlyUsageSummary,
|
|
TaskType
|
|
)
|
|
|
|
logger = get_logger(__file__)
|
|
|
|
|
|
# Cost per 1000 tokens by model (in USD)
|
|
# These are estimates based on Replicate pricing
|
|
MODEL_COSTS = {
|
|
# Llama models (via Replicate) - very cheap
|
|
"meta/meta-llama-3-8b-instruct": {
|
|
"input": 0.0001, # $0.0001 per 1K input tokens
|
|
"output": 0.0001, # $0.0001 per 1K output tokens
|
|
},
|
|
"meta/meta-llama-3-70b-instruct": {
|
|
"input": 0.0006,
|
|
"output": 0.0006,
|
|
},
|
|
# Claude models (via Replicate)
|
|
"anthropic/claude-3.5-haiku": {
|
|
"input": 0.001, # $0.001 per 1K input tokens
|
|
"output": 0.005, # $0.005 per 1K output tokens
|
|
},
|
|
"anthropic/claude-3-haiku": {
|
|
"input": 0.00025,
|
|
"output": 0.00125,
|
|
},
|
|
"anthropic/claude-3.5-sonnet": {
|
|
"input": 0.003, # $0.003 per 1K input tokens
|
|
"output": 0.015, # $0.015 per 1K output tokens
|
|
},
|
|
"anthropic/claude-4.5-sonnet": {
|
|
"input": 0.003,
|
|
"output": 0.015,
|
|
},
|
|
"anthropic/claude-3-opus": {
|
|
"input": 0.015, # $0.015 per 1K input tokens
|
|
"output": 0.075, # $0.075 per 1K output tokens
|
|
},
|
|
}
|
|
|
|
# Default cost for unknown models
|
|
DEFAULT_COST = {"input": 0.001, "output": 0.005}
|
|
|
|
|
|
class UsageTrackingService:
|
|
"""
|
|
Service for tracking AI usage and calculating costs.
|
|
|
|
This service provides:
|
|
- Logging individual AI usage events to Appwrite
|
|
- Calculating estimated costs based on model pricing
|
|
- Retrieving daily and monthly usage summaries
|
|
- Analytics for monitoring and rate limiting
|
|
|
|
The service stores usage logs in an Appwrite collection named 'ai_usage_logs'.
|
|
"""
|
|
|
|
# Collection ID for usage logs
|
|
COLLECTION_ID = "ai_usage_logs"
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialize the usage tracking service.
|
|
|
|
Reads configuration from environment variables:
|
|
- APPWRITE_ENDPOINT: Appwrite API endpoint
|
|
- APPWRITE_PROJECT_ID: Appwrite project ID
|
|
- APPWRITE_API_KEY: Appwrite API key
|
|
- APPWRITE_DATABASE_ID: Appwrite database ID
|
|
|
|
Raises:
|
|
ValueError: If required environment variables are missing
|
|
"""
|
|
self.endpoint = os.getenv('APPWRITE_ENDPOINT')
|
|
self.project_id = os.getenv('APPWRITE_PROJECT_ID')
|
|
self.api_key = os.getenv('APPWRITE_API_KEY')
|
|
self.database_id = os.getenv('APPWRITE_DATABASE_ID', 'main')
|
|
|
|
if not all([self.endpoint, self.project_id, self.api_key]):
|
|
logger.error("Missing Appwrite configuration in environment variables")
|
|
raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.")
|
|
|
|
# Initialize Appwrite client
|
|
self.client = Client()
|
|
self.client.set_endpoint(self.endpoint)
|
|
self.client.set_project(self.project_id)
|
|
self.client.set_key(self.api_key)
|
|
|
|
# Initialize TablesDB service
|
|
self.tables_db = TablesDB(self.client)
|
|
|
|
logger.info("UsageTrackingService initialized", database_id=self.database_id)
|
|
|
|
def log_usage(
|
|
self,
|
|
user_id: str,
|
|
model: str,
|
|
tokens_input: int,
|
|
tokens_output: int,
|
|
task_type: TaskType,
|
|
session_id: Optional[str] = None,
|
|
character_id: Optional[str] = None,
|
|
request_duration_ms: int = 0,
|
|
success: bool = True,
|
|
error_message: Optional[str] = None
|
|
) -> AIUsageLog:
|
|
"""
|
|
Log an AI usage event.
|
|
|
|
This method creates a new usage log entry in Appwrite with all
|
|
relevant information about the AI request including calculated
|
|
estimated cost.
|
|
|
|
Args:
|
|
user_id: User who made the request
|
|
model: Model identifier (e.g., "anthropic/claude-3.5-sonnet")
|
|
tokens_input: Number of input tokens (prompt)
|
|
tokens_output: Number of output tokens (response)
|
|
task_type: Type of task (story, combat, quest, npc)
|
|
session_id: Optional game session ID
|
|
character_id: Optional character ID
|
|
request_duration_ms: Request duration in milliseconds
|
|
success: Whether the request succeeded
|
|
error_message: Error message if failed
|
|
|
|
Returns:
|
|
AIUsageLog with the logged data
|
|
|
|
Raises:
|
|
AppwriteException: If storage fails
|
|
"""
|
|
# Calculate total tokens
|
|
tokens_total = tokens_input + tokens_output
|
|
|
|
# Calculate estimated cost
|
|
estimated_cost = self._calculate_cost(model, tokens_input, tokens_output)
|
|
|
|
# Generate log ID
|
|
log_id = str(uuid4())
|
|
|
|
# Create usage log
|
|
usage_log = AIUsageLog(
|
|
log_id=log_id,
|
|
user_id=user_id,
|
|
timestamp=datetime.now(timezone.utc),
|
|
model=model,
|
|
tokens_input=tokens_input,
|
|
tokens_output=tokens_output,
|
|
tokens_total=tokens_total,
|
|
estimated_cost=estimated_cost,
|
|
task_type=task_type,
|
|
session_id=session_id,
|
|
character_id=character_id,
|
|
request_duration_ms=request_duration_ms,
|
|
success=success,
|
|
error_message=error_message,
|
|
)
|
|
|
|
try:
|
|
# Store in Appwrite
|
|
result = self.tables_db.create_row(
|
|
database_id=self.database_id,
|
|
table_id=self.COLLECTION_ID,
|
|
row_id=log_id,
|
|
data=usage_log.to_dict()
|
|
)
|
|
|
|
logger.info(
|
|
"AI usage logged",
|
|
log_id=log_id,
|
|
user_id=user_id,
|
|
model=model,
|
|
tokens_total=tokens_total,
|
|
estimated_cost=estimated_cost,
|
|
task_type=task_type.value,
|
|
success=success
|
|
)
|
|
|
|
return usage_log
|
|
|
|
except AppwriteException as e:
|
|
logger.error(
|
|
"Failed to log AI usage",
|
|
user_id=user_id,
|
|
model=model,
|
|
error=str(e),
|
|
code=e.code
|
|
)
|
|
raise
|
|
|
|
def get_daily_usage(self, user_id: str, target_date: date) -> DailyUsageSummary:
|
|
"""
|
|
Get AI usage summary for a specific day.
|
|
|
|
Args:
|
|
user_id: User ID to get usage for
|
|
target_date: Date to get usage for
|
|
|
|
Returns:
|
|
DailyUsageSummary with aggregated usage data
|
|
|
|
Raises:
|
|
AppwriteException: If query fails
|
|
"""
|
|
try:
|
|
# Build date range for the target day (UTC)
|
|
start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc)
|
|
end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc)
|
|
|
|
# Query usage logs for this user and date
|
|
result = self.tables_db.list_rows(
|
|
database_id=self.database_id,
|
|
table_id=self.COLLECTION_ID,
|
|
queries=[
|
|
Query.equal("user_id", user_id),
|
|
Query.greater_than_equal("timestamp", start_of_day.isoformat()),
|
|
Query.less_than_equal("timestamp", end_of_day.isoformat()),
|
|
Query.limit(1000) # Cap at 1000 entries per day
|
|
]
|
|
)
|
|
|
|
# Aggregate the data
|
|
total_requests = 0
|
|
total_tokens = 0
|
|
total_input_tokens = 0
|
|
total_output_tokens = 0
|
|
total_cost = 0.0
|
|
requests_by_task: Dict[str, int] = {}
|
|
|
|
for doc in result['rows']:
|
|
total_requests += 1
|
|
total_tokens += doc.get('tokens_total', 0)
|
|
total_input_tokens += doc.get('tokens_input', 0)
|
|
total_output_tokens += doc.get('tokens_output', 0)
|
|
total_cost += doc.get('estimated_cost', 0.0)
|
|
|
|
task_type = doc.get('task_type', 'general')
|
|
requests_by_task[task_type] = requests_by_task.get(task_type, 0) + 1
|
|
|
|
summary = DailyUsageSummary(
|
|
date=target_date,
|
|
user_id=user_id,
|
|
total_requests=total_requests,
|
|
total_tokens=total_tokens,
|
|
total_input_tokens=total_input_tokens,
|
|
total_output_tokens=total_output_tokens,
|
|
estimated_cost=total_cost,
|
|
requests_by_task=requests_by_task
|
|
)
|
|
|
|
logger.debug(
|
|
"Daily usage retrieved",
|
|
user_id=user_id,
|
|
date=target_date.isoformat(),
|
|
total_requests=total_requests,
|
|
estimated_cost=total_cost
|
|
)
|
|
|
|
return summary
|
|
|
|
except AppwriteException as e:
|
|
logger.error(
|
|
"Failed to get daily usage",
|
|
user_id=user_id,
|
|
date=target_date.isoformat(),
|
|
error=str(e),
|
|
code=e.code
|
|
)
|
|
raise
|
|
|
|
def get_monthly_cost(self, user_id: str, year: int, month: int) -> MonthlyUsageSummary:
|
|
"""
|
|
Get AI usage cost summary for a specific month.
|
|
|
|
Args:
|
|
user_id: User ID to get cost for
|
|
year: Year (e.g., 2025)
|
|
month: Month (1-12)
|
|
|
|
Returns:
|
|
MonthlyUsageSummary with aggregated cost data
|
|
|
|
Raises:
|
|
AppwriteException: If query fails
|
|
ValueError: If month is invalid
|
|
"""
|
|
if not 1 <= month <= 12:
|
|
raise ValueError(f"Invalid month: {month}. Must be 1-12.")
|
|
|
|
try:
|
|
# Build date range for the month
|
|
start_of_month = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
|
|
# Calculate end of month
|
|
if month == 12:
|
|
end_of_month = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1)
|
|
else:
|
|
end_of_month = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc) - timedelta(seconds=1)
|
|
|
|
# Query usage logs for this user and month
|
|
result = self.tables_db.list_rows(
|
|
database_id=self.database_id,
|
|
table_id=self.COLLECTION_ID,
|
|
queries=[
|
|
Query.equal("user_id", user_id),
|
|
Query.greater_than_equal("timestamp", start_of_month.isoformat()),
|
|
Query.less_than_equal("timestamp", end_of_month.isoformat()),
|
|
Query.limit(5000) # Cap at 5000 entries per month
|
|
]
|
|
)
|
|
|
|
# Aggregate the data
|
|
total_requests = 0
|
|
total_tokens = 0
|
|
total_cost = 0.0
|
|
|
|
for doc in result['rows']:
|
|
total_requests += 1
|
|
total_tokens += doc.get('tokens_total', 0)
|
|
total_cost += doc.get('estimated_cost', 0.0)
|
|
|
|
summary = MonthlyUsageSummary(
|
|
year=year,
|
|
month=month,
|
|
user_id=user_id,
|
|
total_requests=total_requests,
|
|
total_tokens=total_tokens,
|
|
estimated_cost=total_cost
|
|
)
|
|
|
|
logger.debug(
|
|
"Monthly cost retrieved",
|
|
user_id=user_id,
|
|
year=year,
|
|
month=month,
|
|
total_requests=total_requests,
|
|
estimated_cost=total_cost
|
|
)
|
|
|
|
return summary
|
|
|
|
except AppwriteException as e:
|
|
logger.error(
|
|
"Failed to get monthly cost",
|
|
user_id=user_id,
|
|
year=year,
|
|
month=month,
|
|
error=str(e),
|
|
code=e.code
|
|
)
|
|
raise
|
|
|
|
def get_total_daily_cost(self, target_date: date) -> float:
|
|
"""
|
|
Get the total AI cost across all users for a specific day.
|
|
|
|
Used for admin monitoring and alerting.
|
|
|
|
Args:
|
|
target_date: Date to get cost for
|
|
|
|
Returns:
|
|
Total estimated cost in USD
|
|
|
|
Raises:
|
|
AppwriteException: If query fails
|
|
"""
|
|
try:
|
|
# Build date range for the target day
|
|
start_of_day = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=timezone.utc)
|
|
end_of_day = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=timezone.utc)
|
|
|
|
# Query all usage logs for this date
|
|
result = self.tables_db.list_rows(
|
|
database_id=self.database_id,
|
|
table_id=self.COLLECTION_ID,
|
|
queries=[
|
|
Query.greater_than_equal("timestamp", start_of_day.isoformat()),
|
|
Query.less_than_equal("timestamp", end_of_day.isoformat()),
|
|
Query.limit(10000)
|
|
]
|
|
)
|
|
|
|
# Sum up costs
|
|
total_cost = sum(doc.get('estimated_cost', 0.0) for doc in result['rows'])
|
|
|
|
logger.debug(
|
|
"Total daily cost retrieved",
|
|
date=target_date.isoformat(),
|
|
total_cost=total_cost,
|
|
total_documents=len(result['rows'])
|
|
)
|
|
|
|
return total_cost
|
|
|
|
except AppwriteException as e:
|
|
logger.error(
|
|
"Failed to get total daily cost",
|
|
date=target_date.isoformat(),
|
|
error=str(e),
|
|
code=e.code
|
|
)
|
|
raise
|
|
|
|
def get_user_request_count_today(self, user_id: str) -> int:
|
|
"""
|
|
Get the number of AI requests a user has made today.
|
|
|
|
Used for rate limiting checks.
|
|
|
|
Args:
|
|
user_id: User ID to check
|
|
|
|
Returns:
|
|
Number of requests made today
|
|
|
|
Raises:
|
|
AppwriteException: If query fails
|
|
"""
|
|
try:
|
|
summary = self.get_daily_usage(user_id, date.today())
|
|
return summary.total_requests
|
|
|
|
except AppwriteException:
|
|
# If there's an error, return 0 to be safe (fail open)
|
|
logger.warning(
|
|
"Failed to get user request count, returning 0",
|
|
user_id=user_id
|
|
)
|
|
return 0
|
|
|
|
def _calculate_cost(self, model: str, tokens_input: int, tokens_output: int) -> float:
|
|
"""
|
|
Calculate the estimated cost for an AI request.
|
|
|
|
Args:
|
|
model: Model identifier
|
|
tokens_input: Number of input tokens
|
|
tokens_output: Number of output tokens
|
|
|
|
Returns:
|
|
Estimated cost in USD
|
|
"""
|
|
# Get cost per 1K tokens for this model
|
|
model_cost = MODEL_COSTS.get(model, DEFAULT_COST)
|
|
|
|
# Calculate cost (costs are per 1K tokens)
|
|
input_cost = (tokens_input / 1000) * model_cost["input"]
|
|
output_cost = (tokens_output / 1000) * model_cost["output"]
|
|
total_cost = input_cost + output_cost
|
|
|
|
return round(total_cost, 6) # Round to 6 decimal places
|
|
|
|
@staticmethod
|
|
def estimate_cost_for_model(model: str, tokens_input: int, tokens_output: int) -> float:
|
|
"""
|
|
Static method to estimate cost without needing a service instance.
|
|
|
|
Useful for pre-calculation and UI display.
|
|
|
|
Args:
|
|
model: Model identifier
|
|
tokens_input: Number of input tokens
|
|
tokens_output: Number of output tokens
|
|
|
|
Returns:
|
|
Estimated cost in USD
|
|
"""
|
|
model_cost = MODEL_COSTS.get(model, DEFAULT_COST)
|
|
input_cost = (tokens_input / 1000) * model_cost["input"]
|
|
output_cost = (tokens_output / 1000) * model_cost["output"]
|
|
return round(input_cost + output_cost, 6)
|
|
|
|
@staticmethod
|
|
def get_model_cost_info(model: str) -> Dict[str, float]:
|
|
"""
|
|
Get cost information for a model.
|
|
|
|
Args:
|
|
model: Model identifier
|
|
|
|
Returns:
|
|
Dictionary with 'input' and 'output' cost per 1K tokens
|
|
"""
|
|
return MODEL_COSTS.get(model, DEFAULT_COST)
|