Files
Code_of_Conquest/api/app/services/database_init.py
2025-11-24 23:10:55 -06:00

710 lines
23 KiB
Python

"""
Database Initialization Service.
This service handles programmatic creation of Appwrite database tables,
including schema definition, column creation, and index setup.
"""
import os
import time
from typing import List, Dict, Any, Optional
from appwrite.client import Client
from appwrite.services.tables_db import TablesDB
from appwrite.exception import AppwriteException
from app.utils.logging import get_logger
logger = get_logger(__file__)
class DatabaseInitService:
"""
Service for initializing Appwrite database tables.
This service provides methods to:
- Create tables if they don't exist
- Define table schemas (columns/attributes)
- Create indexes for efficient querying
- Validate existing table structures
"""
def __init__(self):
"""
Initialize the database initialization service.
Reads configuration from environment variables:
- APPWRITE_ENDPOINT: Appwrite API endpoint
- APPWRITE_PROJECT_ID: Appwrite project ID
- APPWRITE_API_KEY: Appwrite API key
- APPWRITE_DATABASE_ID: Appwrite database ID
"""
self.endpoint = os.getenv('APPWRITE_ENDPOINT')
self.project_id = os.getenv('APPWRITE_PROJECT_ID')
self.api_key = os.getenv('APPWRITE_API_KEY')
self.database_id = os.getenv('APPWRITE_DATABASE_ID', 'main')
if not all([self.endpoint, self.project_id, self.api_key]):
logger.error("Missing Appwrite configuration in environment variables")
raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.")
# Initialize Appwrite client
self.client = Client()
self.client.set_endpoint(self.endpoint)
self.client.set_project(self.project_id)
self.client.set_key(self.api_key)
# Initialize TablesDB service
self.tables_db = TablesDB(self.client)
logger.info("DatabaseInitService initialized", database_id=self.database_id)
def init_all_tables(self) -> Dict[str, bool]:
"""
Initialize all application tables.
Returns:
Dictionary mapping table names to success status
"""
results = {}
logger.info("Initializing all database tables")
# Initialize characters table
try:
self.init_characters_table()
results['characters'] = True
logger.info("Characters table initialized successfully")
except Exception as e:
logger.error("Failed to initialize characters table", error=str(e))
results['characters'] = False
# Initialize game_sessions table
try:
self.init_game_sessions_table()
results['game_sessions'] = True
logger.info("Game sessions table initialized successfully")
except Exception as e:
logger.error("Failed to initialize game_sessions table", error=str(e))
results['game_sessions'] = False
# Initialize ai_usage_logs table
try:
self.init_ai_usage_logs_table()
results['ai_usage_logs'] = True
logger.info("AI usage logs table initialized successfully")
except Exception as e:
logger.error("Failed to initialize ai_usage_logs table", error=str(e))
results['ai_usage_logs'] = False
success_count = sum(1 for v in results.values() if v)
total_count = len(results)
logger.info("Table initialization complete",
success=success_count,
total=total_count,
results=results)
return results
def init_characters_table(self) -> bool:
"""
Initialize the characters table.
Table schema:
- userId (string, required): Owner's user ID
- characterData (string, required): JSON-serialized character data
- is_active (boolean, default=True): Soft delete flag
- created_at (datetime): Auto-managed creation timestamp
- updated_at (datetime): Auto-managed update timestamp
Indexes:
- userId: For general user queries
- userId + is_active: Composite index for efficiently fetching active characters
Returns:
True if successful
Raises:
AppwriteException: If table creation fails
"""
table_id = 'characters'
logger.info("Initializing characters table", table_id=table_id)
try:
# Check if table already exists
try:
self.tables_db.get_table(
database_id=self.database_id,
table_id=table_id
)
logger.info("Characters table already exists", table_id=table_id)
return True
except AppwriteException as e:
if e.code != 404:
raise
logger.info("Characters table does not exist, creating...")
# Create table
logger.info("Creating characters table")
table = self.tables_db.create_table(
database_id=self.database_id,
table_id=table_id,
name='Characters'
)
logger.info("Characters table created", table_id=table['$id'])
# Create columns
self._create_column(
table_id=table_id,
column_id='userId',
column_type='string',
size=255,
required=True
)
self._create_column(
table_id=table_id,
column_id='characterData',
column_type='string',
size=65535, # Large text field for JSON data
required=True
)
self._create_column(
table_id=table_id,
column_id='is_active',
column_type='boolean',
required=False, # Cannot be required if we want a default value
default=True
)
# Note: created_at and updated_at are auto-managed by DatabaseService
# through the _parse_row method and timestamp updates
# Wait for columns to fully propagate in Appwrite before creating indexes
logger.info("Waiting for columns to propagate before creating indexes...")
time.sleep(2)
# Create indexes for efficient querying
# Note: Individual userId index for general user queries
self._create_index(
table_id=table_id,
index_id='idx_userId',
index_type='key',
attributes=['userId']
)
# Composite index for the most common query pattern:
# Query.equal('userId', user_id) + Query.equal('is_active', True)
# This single composite index covers both conditions efficiently
self._create_index(
table_id=table_id,
index_id='idx_userId_is_active',
index_type='key',
attributes=['userId', 'is_active']
)
logger.info("Characters table initialized successfully", table_id=table_id)
return True
except AppwriteException as e:
logger.error("Failed to initialize characters table",
table_id=table_id,
error=str(e),
code=e.code)
raise
def init_game_sessions_table(self) -> bool:
"""
Initialize the game_sessions table.
Table schema:
- userId (string, required): Owner's user ID
- characterId (string, required): Character ID for this session
- sessionData (string, required): JSON-serialized session data
- status (string, required): Session status (active, completed, abandoned)
- sessionType (string, required): Session type (solo, multiplayer)
Indexes:
- userId: For user session queries
- userId + status: For active session queries
- characterId: For character session lookups
Returns:
True if successful
Raises:
AppwriteException: If table creation fails
"""
table_id = 'game_sessions'
logger.info("Initializing game_sessions table", table_id=table_id)
try:
# Check if table already exists
try:
self.tables_db.get_table(
database_id=self.database_id,
table_id=table_id
)
logger.info("Game sessions table already exists", table_id=table_id)
return True
except AppwriteException as e:
if e.code != 404:
raise
logger.info("Game sessions table does not exist, creating...")
# Create table
logger.info("Creating game_sessions table")
table = self.tables_db.create_table(
database_id=self.database_id,
table_id=table_id,
name='Game Sessions'
)
logger.info("Game sessions table created", table_id=table['$id'])
# Create columns
self._create_column(
table_id=table_id,
column_id='userId',
column_type='string',
size=255,
required=True
)
self._create_column(
table_id=table_id,
column_id='characterId',
column_type='string',
size=255,
required=True
)
self._create_column(
table_id=table_id,
column_id='sessionData',
column_type='string',
size=65535, # Large text field for JSON data
required=True
)
self._create_column(
table_id=table_id,
column_id='status',
column_type='string',
size=50,
required=True
)
self._create_column(
table_id=table_id,
column_id='sessionType',
column_type='string',
size=50,
required=True
)
# Wait for columns to fully propagate
logger.info("Waiting for columns to propagate before creating indexes...")
time.sleep(2)
# Create indexes
self._create_index(
table_id=table_id,
index_id='idx_userId',
index_type='key',
attributes=['userId']
)
self._create_index(
table_id=table_id,
index_id='idx_userId_status',
index_type='key',
attributes=['userId', 'status']
)
self._create_index(
table_id=table_id,
index_id='idx_characterId',
index_type='key',
attributes=['characterId']
)
logger.info("Game sessions table initialized successfully", table_id=table_id)
return True
except AppwriteException as e:
logger.error("Failed to initialize game_sessions table",
table_id=table_id,
error=str(e),
code=e.code)
raise
def init_ai_usage_logs_table(self) -> bool:
"""
Initialize the ai_usage_logs table for tracking AI API usage and costs.
Table schema:
- user_id (string, required): User who made the request
- timestamp (string, required): ISO timestamp of the request
- model (string, required): Model identifier
- tokens_input (integer, required): Input token count
- tokens_output (integer, required): Output token count
- tokens_total (integer, required): Total token count
- estimated_cost (float, required): Estimated cost in USD
- task_type (string, required): Type of task
- session_id (string, optional): Game session ID
- character_id (string, optional): Character ID
- request_duration_ms (integer): Request duration in milliseconds
- success (boolean): Whether request succeeded
- error_message (string, optional): Error message if failed
Indexes:
- user_id: For user usage queries
- timestamp: For date range queries
- user_id + timestamp: Composite for user date range queries
Returns:
True if successful
Raises:
AppwriteException: If table creation fails
"""
table_id = 'ai_usage_logs'
logger.info("Initializing ai_usage_logs table", table_id=table_id)
try:
# Check if table already exists
try:
self.tables_db.get_table(
database_id=self.database_id,
table_id=table_id
)
logger.info("AI usage logs table already exists", table_id=table_id)
return True
except AppwriteException as e:
if e.code != 404:
raise
logger.info("AI usage logs table does not exist, creating...")
# Create table
logger.info("Creating ai_usage_logs table")
table = self.tables_db.create_table(
database_id=self.database_id,
table_id=table_id,
name='AI Usage Logs'
)
logger.info("AI usage logs table created", table_id=table['$id'])
# Create columns
self._create_column(
table_id=table_id,
column_id='user_id',
column_type='string',
size=255,
required=True
)
self._create_column(
table_id=table_id,
column_id='timestamp',
column_type='string',
size=50, # ISO timestamp format
required=True
)
self._create_column(
table_id=table_id,
column_id='model',
column_type='string',
size=255,
required=True
)
self._create_column(
table_id=table_id,
column_id='tokens_input',
column_type='integer',
required=True
)
self._create_column(
table_id=table_id,
column_id='tokens_output',
column_type='integer',
required=True
)
self._create_column(
table_id=table_id,
column_id='tokens_total',
column_type='integer',
required=True
)
self._create_column(
table_id=table_id,
column_id='estimated_cost',
column_type='float',
required=True
)
self._create_column(
table_id=table_id,
column_id='task_type',
column_type='string',
size=50,
required=True
)
self._create_column(
table_id=table_id,
column_id='session_id',
column_type='string',
size=255,
required=False
)
self._create_column(
table_id=table_id,
column_id='character_id',
column_type='string',
size=255,
required=False
)
self._create_column(
table_id=table_id,
column_id='request_duration_ms',
column_type='integer',
required=False,
default=0
)
self._create_column(
table_id=table_id,
column_id='success',
column_type='boolean',
required=False,
default=True
)
self._create_column(
table_id=table_id,
column_id='error_message',
column_type='string',
size=1000,
required=False
)
# Wait for columns to fully propagate
logger.info("Waiting for columns to propagate before creating indexes...")
time.sleep(2)
# Create indexes
self._create_index(
table_id=table_id,
index_id='idx_user_id',
index_type='key',
attributes=['user_id']
)
self._create_index(
table_id=table_id,
index_id='idx_timestamp',
index_type='key',
attributes=['timestamp']
)
self._create_index(
table_id=table_id,
index_id='idx_user_id_timestamp',
index_type='key',
attributes=['user_id', 'timestamp']
)
logger.info("AI usage logs table initialized successfully", table_id=table_id)
return True
except AppwriteException as e:
logger.error("Failed to initialize ai_usage_logs table",
table_id=table_id,
error=str(e),
code=e.code)
raise
def _create_column(
self,
table_id: str,
column_id: str,
column_type: str,
size: Optional[int] = None,
required: bool = False,
default: Optional[Any] = None,
array: bool = False
) -> Dict[str, Any]:
"""
Create a column in a table.
Args:
table_id: Table ID
column_id: Column ID
column_type: Column type (string, integer, float, boolean, datetime, email, ip, url)
size: Column size (for string types)
required: Whether column is required
default: Default value
array: Whether column is an array
Returns:
Column creation response
Raises:
AppwriteException: If column creation fails
"""
try:
logger.info("Creating column",
table_id=table_id,
column_id=column_id,
column_type=column_type)
# Build column parameters (Appwrite SDK uses 'key' not 'column_id')
params = {
'database_id': self.database_id,
'table_id': table_id,
'key': column_id,
'required': required,
'array': array
}
if size is not None:
params['size'] = size
if default is not None:
params['default'] = default
# Create column using the appropriate method based on type
if column_type == 'string':
result = self.tables_db.create_string_column(**params)
elif column_type == 'integer':
result = self.tables_db.create_integer_column(**params)
elif column_type == 'float':
result = self.tables_db.create_float_column(**params)
elif column_type == 'boolean':
result = self.tables_db.create_boolean_column(**params)
elif column_type == 'datetime':
result = self.tables_db.create_datetime_column(**params)
elif column_type == 'email':
result = self.tables_db.create_email_column(**params)
else:
raise ValueError(f"Unsupported column type: {column_type}")
logger.info("Column created successfully",
table_id=table_id,
column_id=column_id)
return result
except AppwriteException as e:
# If column already exists, log warning but don't fail
if e.code == 409: # Conflict - column already exists
logger.warning("Column already exists",
table_id=table_id,
column_id=column_id)
return {}
logger.error("Failed to create column",
table_id=table_id,
column_id=column_id,
error=str(e),
code=e.code)
raise
def _create_index(
self,
table_id: str,
index_id: str,
index_type: str,
attributes: List[str],
orders: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Create an index on a table.
Args:
table_id: Table ID
index_id: Index ID
index_type: Index type (key, fulltext, unique)
attributes: List of column IDs to index
orders: List of sort orders (ASC, DESC) for each attribute
Returns:
Index creation response
Raises:
AppwriteException: If index creation fails
"""
try:
logger.info("Creating index",
table_id=table_id,
index_id=index_id,
attributes=attributes)
result = self.tables_db.create_index(
database_id=self.database_id,
table_id=table_id,
key=index_id,
type=index_type,
columns=attributes, # SDK uses 'columns', not 'attributes'
orders=orders or ['ASC'] * len(attributes)
)
logger.info("Index created successfully",
table_id=table_id,
index_id=index_id)
return result
except AppwriteException as e:
# If index already exists, log warning but don't fail
if e.code == 409: # Conflict - index already exists
logger.warning("Index already exists",
table_id=table_id,
index_id=index_id)
return {}
logger.error("Failed to create index",
table_id=table_id,
index_id=index_id,
error=str(e),
code=e.code)
raise
# Global instance for convenience
_init_service_instance: Optional[DatabaseInitService] = None
def get_database_init_service() -> DatabaseInitService:
"""
Get the global DatabaseInitService instance.
Returns:
Singleton DatabaseInitService instance
"""
global _init_service_instance
if _init_service_instance is None:
_init_service_instance = DatabaseInitService()
return _init_service_instance
def init_database() -> Dict[str, bool]:
"""
Convenience function to initialize all database tables.
Returns:
Dictionary mapping table names to success status
"""
service = get_database_init_service()
return service.init_all_tables()