""" Character Service - CRUD operations for player characters. This service handles creating, reading, updating, and deleting characters, with enforcement of tier-based character limits, skill unlock validation, and integration with Appwrite database. """ import json from typing import List, Optional, Dict, Any from datetime import datetime from appwrite.query import Query from appwrite.exception import AppwriteException from appwrite.id import ID from app.models.character import Character from app.models.skills import PlayerClass from app.models.origins import Origin from app.services.database_service import get_database_service from app.services.appwrite_service import AppwriteService from app.services.class_loader import get_class_loader from app.services.origin_service import get_origin_service from app.utils.logging import get_logger logger = get_logger(__file__) # Character limits by tier CHARACTER_LIMITS = { 'free': 1, 'basic': 3, 'premium': 5, 'elite': 10 } class CharacterLimitExceeded(Exception): """Raised when user tries to create more characters than their tier allows.""" pass class CharacterNotFound(Exception): """Raised when character ID doesn't exist or user doesn't own it.""" pass class SkillUnlockError(Exception): """Raised when skill unlock validation fails.""" pass class InsufficientGold(Exception): """Raised when character doesn't have enough gold for an operation.""" pass class CharacterService: """ Service for managing player characters. This service provides: - Character creation with tier limit enforcement - Character retrieval (single and list) - Character deletion - Skill unlock/respec functionality """ def __init__(self): """Initialize the character service with dependencies.""" self.db = get_database_service() self.appwrite = AppwriteService() self.class_loader = get_class_loader() self.origin_service = get_origin_service() self.collection_id = "characters" logger.info("CharacterService initialized") def create_character( self, user_id: str, name: str, class_id: str, origin_id: str ) -> Character: """ Create a new character for a user. This method: 1. Validates user hasn't exceeded tier character limit 2. Loads class and origin data 3. Creates character with default starting state 4. Stores in Appwrite database Args: user_id: Owner's user ID (from Appwrite auth) name: Character name class_id: PlayerClass ID (e.g., "warrior", "arcanist") origin_id: Origin ID (e.g., "soul_revenant") Returns: Created Character instance Raises: CharacterLimitExceeded: If user has reached their tier limit ValueError: If class or origin not found AppwriteException: If database operation fails """ try: logger.info("Creating character", user_id=user_id, name=name, class_id=class_id, origin_id=origin_id) # Check character limit for user's tier tier = self.appwrite.get_user_tier(user_id) current_count = self.count_user_characters(user_id) limit = CHARACTER_LIMITS.get(tier, 1) if current_count >= limit: logger.warning("Character limit exceeded", user_id=user_id, tier=tier, current=current_count, limit=limit) raise CharacterLimitExceeded( f"Character limit reached for {tier} tier ({current_count}/{limit}). " f"Upgrade your subscription to create more characters." ) # Load class and origin data player_class = self.class_loader.load_class(class_id) if not player_class: raise ValueError(f"Class not found: {class_id}") origin = self.origin_service.load_origin(origin_id) if not origin: raise ValueError(f"Origin not found: {origin_id}") # Generate unique character ID character_id = ID.unique() # Determine starting location - use location system if available from app.services.location_loader import get_location_loader location_loader = get_location_loader() starting_locations = location_loader.get_starting_locations() if starting_locations: # Use first starting location from location data (crossville_village) start_loc = starting_locations[0] starting_location_id = start_loc.location_id else: # Fallback to origin's starting location starting_location_id = origin.starting_location.id # Create character instance with starting state character = Character( character_id=character_id, user_id=user_id, name=name, player_class=player_class, origin=origin, level=1, experience=0, base_stats=player_class.base_stats.copy(), unlocked_skills=[], inventory=[], equipped={}, gold=0, active_quests=[], discovered_locations=[starting_location_id], # Initialize with starting location current_location=starting_location_id # Set starting location ) # Serialize character to JSON character_dict = character.to_dict() character_json = json.dumps(character_dict) # Store in database document_data = { 'userId': user_id, 'characterData': character_json, 'is_active': True } self.db.create_document( collection_id=self.collection_id, data=document_data, document_id=character_id ) logger.info("Character created successfully", character_id=character_id, user_id=user_id, class_id=class_id) return character except CharacterLimitExceeded: raise except Exception as e: logger.error("Failed to create character", user_id=user_id, error=str(e)) raise def get_character(self, character_id: str, user_id: str) -> Optional[Character]: """ Get a character by ID. Args: character_id: Character ID user_id: User ID (for ownership validation) Returns: Character instance or None if not found Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: logger.debug("Fetching character", character_id=character_id, user_id=user_id) # Get document from database document = self.db.get_row(self.collection_id, character_id) if not document: logger.warning("Character not found", character_id=character_id) raise CharacterNotFound(f"Character not found: {character_id}") # Verify ownership if document.data.get('userId') != user_id: logger.warning("Character ownership mismatch", character_id=character_id, expected_user=user_id, actual_user=document.data.get('userId')) raise CharacterNotFound(f"Character not found: {character_id}") # Parse character data character_json = document.data.get('characterData') character_dict = json.loads(character_json) character = Character.from_dict(character_dict) logger.debug("Character fetched successfully", character_id=character_id) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to fetch character", character_id=character_id, error=str(e)) raise def get_user_characters(self, user_id: str) -> List[Character]: """ Get all characters owned by a user. Args: user_id: User ID Returns: List of Character instances (may be empty) """ try: logger.debug("Fetching user characters", user_id=user_id) # Query for active characters owned by user queries = [ Query.equal('userId', user_id), Query.equal('is_active', True) ] documents = self.db.list_rows( table_id=self.collection_id, queries=queries, limit=100 # Max characters per user is 10 (elite tier) ) # Parse all character data characters = [] for document in documents: try: character_json = document.data.get('characterData') character_dict = json.loads(character_json) character = Character.from_dict(character_dict) characters.append(character) except Exception as e: logger.error("Failed to parse character", document_id=document.id, error=str(e)) continue logger.debug("User characters fetched", user_id=user_id, count=len(characters)) return characters except Exception as e: logger.error("Failed to fetch user characters", user_id=user_id, error=str(e)) raise def count_user_characters(self, user_id: str) -> int: """ Count active characters owned by a user. Args: user_id: User ID Returns: Number of active characters """ try: queries = [ Query.equal('userId', user_id), Query.equal('is_active', True) ] count = self.db.count_documents( collection_id=self.collection_id, queries=queries ) logger.debug("Character count", user_id=user_id, count=count) return count except Exception as e: logger.error("Failed to count characters", user_id=user_id, error=str(e)) return 0 def delete_character(self, character_id: str, user_id: str) -> bool: """ Delete a character (soft delete by marking inactive). Args: character_id: Character ID user_id: User ID (for ownership validation) Returns: True if deletion successful Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: logger.info("Deleting character", character_id=character_id, user_id=user_id) # Verify ownership first character = self.get_character(character_id, user_id) if not character: raise CharacterNotFound(f"Character not found: {character_id}") # Soft delete by marking inactive self.db.update_document( collection_id=self.collection_id, document_id=character_id, data={'is_active': False} ) logger.info("Character deleted successfully", character_id=character_id) return True except CharacterNotFound: raise except Exception as e: logger.error("Failed to delete character", character_id=character_id, error=str(e)) raise def unlock_skill(self, character_id: str, user_id: str, skill_id: str) -> Character: """ Unlock a skill for a character. This method: 1. Validates user owns the character 2. Validates skill exists in character's class 3. Validates prerequisites are met 4. Validates character has skill points available 5. Unlocks the skill Args: character_id: Character ID user_id: User ID (for ownership validation) skill_id: Skill ID to unlock Returns: Updated Character instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it SkillUnlockError: If skill unlock validation fails """ try: logger.info("Unlocking skill", character_id=character_id, skill_id=skill_id) # Get character character = self.get_character(character_id, user_id) # Check if skill already unlocked if skill_id in character.unlocked_skills: raise SkillUnlockError(f"Skill already unlocked: {skill_id}") # Get skill node from class all_skills = character.player_class.get_all_skills() skill_node = next((s for s in all_skills if s.skill_id == skill_id), None) if not skill_node: raise SkillUnlockError(f"Skill not found in class: {skill_id}") # Check prerequisites if skill_node.prerequisites: for prereq in skill_node.prerequisites: if prereq not in character.unlocked_skills: raise SkillUnlockError( f"Prerequisite not met: {prereq} required for {skill_id}" ) # Calculate available skill points (1 per level, minus already unlocked) available_points = character.level - len(character.unlocked_skills) if available_points <= 0: raise SkillUnlockError( f"No skill points available (Level {character.level}, " f"{len(character.unlocked_skills)} skills unlocked)" ) # Unlock skill character.unlocked_skills.append(skill_id) # Save to database self._save_character(character) logger.info("Skill unlocked successfully", character_id=character_id, skill_id=skill_id) return character except (CharacterNotFound, SkillUnlockError): raise except Exception as e: logger.error("Failed to unlock skill", character_id=character_id, skill_id=skill_id, error=str(e)) raise def respec_skills(self, character_id: str, user_id: str) -> Character: """ Reset all unlocked skills for a character. Cost: level × 100 gold Args: character_id: Character ID user_id: User ID (for ownership validation) Returns: Updated Character instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it InsufficientGold: If character can't afford respec """ try: logger.info("Respecing character skills", character_id=character_id) # Get character character = self.get_character(character_id, user_id) # Calculate cost respec_cost = character.level * 100 # Check gold if character.gold < respec_cost: raise InsufficientGold( f"Insufficient gold for respec. Cost: {respec_cost}, Available: {character.gold}" ) # Deduct gold character.gold -= respec_cost # Clear all unlocked skills character.unlocked_skills = [] # Save to database self._save_character(character) logger.info("Skills respeced successfully", character_id=character_id, cost=respec_cost) return character except (CharacterNotFound, InsufficientGold): raise except Exception as e: logger.error("Failed to respec skills", character_id=character_id, error=str(e)) raise def update_character(self, character: Character, user_id: str) -> Character: """ Update a character's data. Args: character: Character instance with updated data user_id: User ID (for ownership validation) Returns: Updated Character instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: logger.info("Updating character", character_id=character.character_id) # Verify ownership existing = self.get_character(character.character_id, user_id) if not existing: raise CharacterNotFound(f"Character not found: {character.character_id}") # Save to database self._save_character(character) logger.info("Character updated successfully", character_id=character.character_id) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to update character", character_id=character.character_id, error=str(e)) raise # ==================== Location and NPC Tracking ==================== def unlock_location( self, character_id: str, user_id: str, location_id: str ) -> Character: """ Add a location to character's discovered_locations. Args: character_id: Character ID user_id: User ID (for ownership validation) location_id: Location ID to unlock Returns: Updated Character instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: logger.info("Unlocking location", character_id=character_id, location_id=location_id) character = self.get_character(character_id, user_id) if location_id not in character.discovered_locations: character.discovered_locations.append(location_id) self._save_character(character) logger.info("Location unlocked", character_id=character_id, location_id=location_id) else: logger.debug("Location already unlocked", character_id=character_id, location_id=location_id) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to unlock location", character_id=character_id, location_id=location_id, error=str(e)) raise def update_npc_interaction( self, character_id: str, user_id: str, npc_id: str, interaction_data: Dict[str, Any] ) -> Character: """ Update NPC interaction state on character record. Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID to update interaction for interaction_data: Dict containing interaction state fields Returns: Updated Character instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: logger.info("Updating NPC interaction", character_id=character_id, npc_id=npc_id) character = self.get_character(character_id, user_id) character.npc_interactions[npc_id] = interaction_data self._save_character(character) logger.info("NPC interaction updated", character_id=character_id, npc_id=npc_id, interaction_count=interaction_data.get('interaction_count', 0)) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to update NPC interaction", character_id=character_id, npc_id=npc_id, error=str(e)) raise def get_npc_interaction( self, character_id: str, user_id: str, npc_id: str ) -> Optional[Dict[str, Any]]: """ Get interaction state for a specific NPC. Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID to get interaction for Returns: Interaction state dict or None if no interactions yet Raises: CharacterNotFound: If character doesn't exist or user doesn't own it """ try: character = self.get_character(character_id, user_id) return character.npc_interactions.get(npc_id) except CharacterNotFound: raise except Exception as e: logger.error("Failed to get NPC interaction", character_id=character_id, npc_id=npc_id, error=str(e)) raise def check_npc_secret_conditions( self, character: Character, npc: Any # NPC type from models.npc ) -> List[str]: """ Check which secrets the NPC would reveal based on character state. Evaluates the NPC's will_share_if conditions against the character's interaction state and returns a list of secrets that should be revealed. Args: character: Character instance npc: NPC instance with knowledge and will_share_if conditions Returns: List of secret strings that should be revealed this conversation """ if not npc.knowledge or not npc.knowledge.will_share_if: return [] interaction = character.npc_interactions.get(npc.npc_id, {}) revealed_indices = interaction.get("revealed_secrets", []) reveals = [] for i, condition in enumerate(npc.knowledge.will_share_if): # Skip already revealed secrets if i in revealed_indices: continue # Evaluate condition if self._evaluate_npc_condition(condition.condition, interaction): reveals.append(condition.reveals) logger.debug("Secret condition met", npc_id=npc.npc_id, condition_index=i, condition=condition.condition) return reveals def _evaluate_npc_condition( self, condition: str, interaction: Dict[str, Any] ) -> bool: """ Evaluate a condition string against interaction state. Supports simple condition patterns: - "interaction_count >= N" - "relationship_level >= N" - "custom_flags.key == true/false" Args: condition: Condition string to evaluate interaction: Character's interaction state with this NPC Returns: True if condition is met, False otherwise """ try: condition = condition.strip() # Pattern: interaction_count >= N if "interaction_count" in condition: if ">=" in condition: required = int(condition.split(">=")[1].strip()) return interaction.get("interaction_count", 0) >= required elif ">" in condition: required = int(condition.split(">")[1].strip()) return interaction.get("interaction_count", 0) > required # Pattern: relationship_level >= N if "relationship_level" in condition: if ">=" in condition: required = int(condition.split(">=")[1].strip()) return interaction.get("relationship_level", 50) >= required elif ">" in condition: required = int(condition.split(">")[1].strip()) return interaction.get("relationship_level", 50) > required # Pattern: custom_flags.key == true/false if "custom_flags." in condition: if "==" in condition: parts = condition.split("==") flag_path = parts[0].strip().replace("custom_flags.", "") expected_str = parts[1].strip().lower() expected = expected_str == "true" flags = interaction.get("custom_flags", {}) return flags.get(flag_path) == expected # Unknown condition pattern - log warning and return False logger.warning("Unknown condition pattern", condition=condition) return False except Exception as e: logger.error("Failed to evaluate condition", condition=condition, error=str(e)) return False def mark_secret_revealed( self, character_id: str, user_id: str, npc_id: str, secret_index: int ) -> Character: """ Mark a secret as revealed so it won't be revealed again. Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID secret_index: Index of the secret in will_share_if list Returns: Updated Character instance """ try: character = self.get_character(character_id, user_id) interaction = character.npc_interactions.get(npc_id, {}) revealed = interaction.get("revealed_secrets", []) if secret_index not in revealed: revealed.append(secret_index) interaction["revealed_secrets"] = revealed character.npc_interactions[npc_id] = interaction self._save_character(character) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to mark secret revealed", character_id=character_id, npc_id=npc_id, secret_index=secret_index, error=str(e)) raise def set_npc_custom_flag( self, character_id: str, user_id: str, npc_id: str, flag_name: str, flag_value: Any ) -> Character: """ Set a custom flag on an NPC interaction (e.g., "helped_with_rats": true). Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID flag_name: Name of the flag flag_value: Value to set Returns: Updated Character instance """ try: character = self.get_character(character_id, user_id) interaction = character.npc_interactions.get(npc_id, {}) custom_flags = interaction.get("custom_flags", {}) custom_flags[flag_name] = flag_value interaction["custom_flags"] = custom_flags character.npc_interactions[npc_id] = interaction self._save_character(character) logger.info("NPC custom flag set", character_id=character_id, npc_id=npc_id, flag_name=flag_name) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to set NPC custom flag", character_id=character_id, npc_id=npc_id, flag_name=flag_name, error=str(e)) raise def adjust_npc_relationship( self, character_id: str, user_id: str, npc_id: str, adjustment: int ) -> Character: """ Adjust relationship level with an NPC. Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID adjustment: Amount to add/subtract from relationship (can be negative) Returns: Updated Character instance """ try: character = self.get_character(character_id, user_id) interaction = character.npc_interactions.get(npc_id, {}) current_level = interaction.get("relationship_level", 50) new_level = max(0, min(100, current_level + adjustment)) # Clamp 0-100 interaction["relationship_level"] = new_level character.npc_interactions[npc_id] = interaction self._save_character(character) logger.info("NPC relationship adjusted", character_id=character_id, npc_id=npc_id, old_level=current_level, new_level=new_level) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to adjust NPC relationship", character_id=character_id, npc_id=npc_id, error=str(e)) raise def add_npc_dialogue_exchange( self, character_id: str, user_id: str, npc_id: str, player_line: str, npc_response: str, max_history: int = 10 ) -> Character: """ Add a dialogue exchange to the NPC conversation history. Stores the player's message and NPC's response for context in future conversations. History is capped at max_history entries per NPC. Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID player_line: What the player said npc_response: What the NPC responded max_history: Maximum number of exchanges to keep per NPC (default 10) Returns: Updated Character instance """ try: character = self.get_character(character_id, user_id) interaction = character.npc_interactions.get(npc_id, {}) dialogue_history = interaction.get("dialogue_history", []) # Add the new exchange exchange = { "player_line": player_line, "npc_response": npc_response } dialogue_history.append(exchange) # Trim to max_history (keep most recent) if len(dialogue_history) > max_history: dialogue_history = dialogue_history[-max_history:] interaction["dialogue_history"] = dialogue_history character.npc_interactions[npc_id] = interaction self._save_character(character) logger.debug("NPC dialogue exchange added", character_id=character_id, npc_id=npc_id, history_length=len(dialogue_history)) return character except CharacterNotFound: raise except Exception as e: logger.error("Failed to add NPC dialogue exchange", character_id=character_id, npc_id=npc_id, error=str(e)) raise def get_npc_dialogue_history( self, character_id: str, user_id: str, npc_id: str, limit: int = 5 ) -> List[Dict[str, str]]: """ Get recent dialogue history with an NPC from recent_messages cache. This method reads from character.npc_interactions[npc_id].recent_messages which contains the last 3 messages for quick AI context. For full conversation history, use ChatMessageService.get_conversation_history(). Backward Compatibility: Falls back to dialogue_history if recent_messages doesn't exist (for characters created before chat_messages system). Args: character_id: Character ID user_id: User ID (for ownership validation) npc_id: NPC ID limit: Maximum number of recent exchanges to return (default 5) Returns: List of dialogue exchanges [{player_message: str, npc_response: str, timestamp: str}, ...] OR legacy format [{player_line: str, npc_response: str}, ...] """ try: character = self.get_character(character_id, user_id) interaction = character.npc_interactions.get(npc_id, {}) # NEW: Try recent_messages first (last 3 messages cache) recent_messages = interaction.get("recent_messages") if recent_messages is not None: # Return most recent exchanges (up to limit, but recent_messages is already capped at 3) return recent_messages[-limit:] if recent_messages else [] # DEPRECATED: Fall back to dialogue_history for backward compatibility # This field will be removed after full migration to chat_messages system dialogue_history = interaction.get("dialogue_history", []) if dialogue_history: logger.debug("Using deprecated dialogue_history field", character_id=character_id, npc_id=npc_id) # Convert old format to new format if needed # Old format: {player_line, npc_response} # New format: {player_message, npc_response, timestamp} converted = [] for entry in dialogue_history[-limit:]: if "player_message" in entry: # Already new format converted.append(entry) else: # Old format, convert converted.append({ "player_message": entry.get("player_line", ""), "npc_response": entry.get("npc_response", ""), "timestamp": "" # No timestamp available in old format }) return converted # No dialogue history at all return [] except CharacterNotFound: raise except Exception as e: logger.error("Failed to get NPC dialogue history", character_id=character_id, npc_id=npc_id, error=str(e)) raise # ==================== End Location and NPC Tracking ==================== def _save_character(self, character: Character) -> None: """ Internal method to save character to database. Args: character: Character instance to save """ # Serialize character to JSON character_dict = character.to_dict() character_json = json.dumps(character_dict) # Update in database self.db.update_document( collection_id=self.collection_id, document_id=character.character_id, data={'characterData': character_json} ) # Global instance for convenience _service_instance: Optional[CharacterService] = None def get_character_service() -> CharacterService: """ Get the global CharacterService instance. Returns: Singleton CharacterService instance """ global _service_instance if _service_instance is None: _service_instance = CharacterService() return _service_instance