""" Database Service for Appwrite database operations. This service wraps the Appwrite Databases SDK to provide a clean interface for CRUD operations on collections. It handles JSON serialization, error handling, and provides structured logging. """ import os from typing import Dict, Any, List, Optional from datetime import datetime, timezone from dataclasses import dataclass from appwrite.client import Client from appwrite.services.tables_db import TablesDB from appwrite.exception import AppwriteException from appwrite.id import ID from app.utils.logging import get_logger logger = get_logger(__file__) @dataclass class DatabaseRow: """ Represents a row in an Appwrite table. Attributes: id: Row ID table_id: Table ID data: Row data (parsed from JSON) created_at: Creation timestamp updated_at: Last update timestamp """ id: str table_id: str data: Dict[str, Any] created_at: datetime updated_at: datetime def to_dict(self) -> Dict[str, Any]: """Convert row to dictionary.""" return { "id": self.id, "table_id": self.table_id, "data": self.data, "created_at": self.created_at.isoformat() if isinstance(self.created_at, datetime) else self.created_at, "updated_at": self.updated_at.isoformat() if isinstance(self.updated_at, datetime) else self.updated_at, } class DatabaseService: """ Service for interacting with Appwrite database tables. This service provides methods for: - Creating rows - Reading rows by ID or query - Updating rows - Deleting rows - Querying with filters """ def __init__(self): """ Initialize the database service. Reads configuration from environment variables: - APPWRITE_ENDPOINT: Appwrite API endpoint - APPWRITE_PROJECT_ID: Appwrite project ID - APPWRITE_API_KEY: Appwrite API key - APPWRITE_DATABASE_ID: Appwrite database ID """ self.endpoint = os.getenv('APPWRITE_ENDPOINT') self.project_id = os.getenv('APPWRITE_PROJECT_ID') self.api_key = os.getenv('APPWRITE_API_KEY') self.database_id = os.getenv('APPWRITE_DATABASE_ID', 'main') if not all([self.endpoint, self.project_id, self.api_key]): logger.error("Missing Appwrite configuration in environment variables") raise ValueError("Appwrite configuration incomplete. Check APPWRITE_* environment variables.") # Initialize Appwrite client self.client = Client() self.client.set_endpoint(self.endpoint) self.client.set_project(self.project_id) self.client.set_key(self.api_key) # Initialize TablesDB service self.tables_db = TablesDB(self.client) logger.info("DatabaseService initialized", database_id=self.database_id) def create_row( self, table_id: str, data: Dict[str, Any], row_id: Optional[str] = None, permissions: Optional[List[str]] = None ) -> DatabaseRow: """ Create a new row in a table. Args: table_id: Table ID (e.g., "characters") data: Row data (will be JSON-serialized if needed) row_id: Optional custom row ID (auto-generated if None) permissions: Optional permissions array Returns: DatabaseRow with created row Raises: AppwriteException: If creation fails """ try: logger.info("Creating row", table_id=table_id, has_custom_id=bool(row_id)) # Generate ID if not provided if row_id is None: row_id = ID.unique() # Create row (Appwrite manages timestamps automatically via $createdAt/$updatedAt) result = self.tables_db.create_row( database_id=self.database_id, table_id=table_id, row_id=row_id, data=data, permissions=permissions or [] ) logger.info("Row created successfully", table_id=table_id, row_id=result['$id']) return self._parse_row(result, table_id) except AppwriteException as e: logger.error("Failed to create row", table_id=table_id, error=str(e), code=e.code) raise def get_row(self, table_id: str, row_id: str) -> Optional[DatabaseRow]: """ Get a row by ID. Args: table_id: Table ID row_id: Row ID Returns: DatabaseRow or None if not found Raises: AppwriteException: If retrieval fails (except 404) """ try: logger.debug("Fetching row", table_id=table_id, row_id=row_id) result = self.tables_db.get_row( database_id=self.database_id, table_id=table_id, row_id=row_id ) return self._parse_row(result, table_id) except AppwriteException as e: if e.code == 404: logger.warning("Row not found", table_id=table_id, row_id=row_id) return None logger.error("Failed to fetch row", table_id=table_id, row_id=row_id, error=str(e), code=e.code) raise def update_row( self, table_id: str, row_id: str, data: Dict[str, Any], permissions: Optional[List[str]] = None ) -> DatabaseRow: """ Update an existing row. Args: table_id: Table ID row_id: Row ID data: New row data (partial updates supported) permissions: Optional permissions array Returns: DatabaseRow with updated row Raises: AppwriteException: If update fails """ try: logger.info("Updating row", table_id=table_id, row_id=row_id) # Update row (Appwrite manages timestamps automatically via $updatedAt) result = self.tables_db.update_row( database_id=self.database_id, table_id=table_id, row_id=row_id, data=data, permissions=permissions ) logger.info("Row updated successfully", table_id=table_id, row_id=row_id) return self._parse_row(result, table_id) except AppwriteException as e: logger.error("Failed to update row", table_id=table_id, row_id=row_id, error=str(e), code=e.code) raise def delete_row(self, table_id: str, row_id: str) -> bool: """ Delete a row. Args: table_id: Table ID row_id: Row ID Returns: True if deletion successful Raises: AppwriteException: If deletion fails """ try: logger.info("Deleting row", table_id=table_id, row_id=row_id) self.tables_db.delete_row( database_id=self.database_id, table_id=table_id, row_id=row_id ) logger.info("Row deleted successfully", table_id=table_id, row_id=row_id) return True except AppwriteException as e: logger.error("Failed to delete row", table_id=table_id, row_id=row_id, error=str(e), code=e.code) raise def list_rows( self, table_id: str, queries: Optional[List[str]] = None, limit: int = 25, offset: int = 0 ) -> List[DatabaseRow]: """ List rows in a table with optional filtering. Args: table_id: Table ID queries: Optional Appwrite query filters limit: Maximum rows to return (default 25, max 100) offset: Number of rows to skip Returns: List of DatabaseRow instances Raises: AppwriteException: If query fails """ try: logger.debug("Listing rows", table_id=table_id, has_queries=bool(queries), limit=limit, offset=offset) result = self.tables_db.list_rows( database_id=self.database_id, table_id=table_id, queries=queries or [] ) rows = [self._parse_row(row, table_id) for row in result['rows']] logger.debug("Rows listed successfully", table_id=table_id, count=len(rows), total=result.get('total', len(rows))) return rows except AppwriteException as e: logger.error("Failed to list rows", table_id=table_id, error=str(e), code=e.code) raise def count_rows(self, table_id: str, queries: Optional[List[str]] = None) -> int: """ Count rows in a table with optional filtering. Args: table_id: Table ID queries: Optional Appwrite query filters Returns: Row count Raises: AppwriteException: If query fails """ try: logger.debug("Counting rows", table_id=table_id, has_queries=bool(queries)) result = self.tables_db.list_rows( database_id=self.database_id, table_id=table_id, queries=queries or [] ) count = result.get('total', len(result.get('rows', []))) logger.debug("Rows counted", table_id=table_id, count=count) return count except AppwriteException as e: logger.error("Failed to count rows", table_id=table_id, error=str(e), code=e.code) raise def _parse_row(self, row: Dict[str, Any], table_id: str) -> DatabaseRow: """ Parse Appwrite row into DatabaseRow. Args: row: Appwrite row dictionary table_id: Table ID Returns: DatabaseRow instance """ # Extract metadata row_id = row['$id'] created_at = row.get('$createdAt', datetime.now(timezone.utc).isoformat()) updated_at = row.get('$updatedAt', datetime.now(timezone.utc).isoformat()) # Parse timestamps if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) # Remove Appwrite metadata from data data = {k: v for k, v in row.items() if not k.startswith('$')} return DatabaseRow( id=row_id, table_id=table_id, data=data, created_at=created_at, updated_at=updated_at ) # Backward compatibility aliases (deprecated, use new methods) def create_document(self, collection_id: str, data: Dict[str, Any], document_id: Optional[str] = None, permissions: Optional[List[str]] = None) -> DatabaseRow: """Deprecated: Use create_row() instead.""" logger.warning("create_document() is deprecated, use create_row() instead") return self.create_row(collection_id, data, document_id, permissions) def get_document(self, collection_id: str, document_id: str) -> Optional[DatabaseRow]: """Deprecated: Use get_row() instead.""" logger.warning("get_document() is deprecated, use get_row() instead") return self.get_row(collection_id, document_id) def update_document(self, collection_id: str, document_id: str, data: Dict[str, Any], permissions: Optional[List[str]] = None) -> DatabaseRow: """Deprecated: Use update_row() instead.""" logger.warning("update_document() is deprecated, use update_row() instead") return self.update_row(collection_id, document_id, data, permissions) def delete_document(self, collection_id: str, document_id: str) -> bool: """Deprecated: Use delete_row() instead.""" logger.warning("delete_document() is deprecated, use delete_row() instead") return self.delete_row(collection_id, document_id) def list_documents(self, collection_id: str, queries: Optional[List[str]] = None, limit: int = 25, offset: int = 0) -> List[DatabaseRow]: """Deprecated: Use list_rows() instead.""" logger.warning("list_documents() is deprecated, use list_rows() instead") return self.list_rows(collection_id, queries, limit, offset) def count_documents(self, collection_id: str, queries: Optional[List[str]] = None) -> int: """Deprecated: Use count_rows() instead.""" logger.warning("count_documents() is deprecated, use count_rows() instead") return self.count_rows(collection_id, queries) # Backward compatibility alias DatabaseDocument = DatabaseRow # Global instance for convenience _service_instance: Optional[DatabaseService] = None def get_database_service() -> DatabaseService: """ Get the global DatabaseService instance. Returns: Singleton DatabaseService instance """ global _service_instance if _service_instance is None: _service_instance = DatabaseService() return _service_instance