Files
Code_of_Conquest/api/app/services/character_service.py
2025-11-24 23:10:55 -06:00

1050 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Character Service - CRUD operations for player characters.
This service handles creating, reading, updating, and deleting characters,
with enforcement of tier-based character limits, skill unlock validation,
and integration with Appwrite database.
"""
import json
from typing import List, Optional, Dict, Any
from datetime import datetime
from appwrite.query import Query
from appwrite.exception import AppwriteException
from appwrite.id import ID
from app.models.character import Character
from app.models.skills import PlayerClass
from app.models.origins import Origin
from app.services.database_service import get_database_service
from app.services.appwrite_service import AppwriteService
from app.services.class_loader import get_class_loader
from app.services.origin_service import get_origin_service
from app.utils.logging import get_logger
logger = get_logger(__file__)
# Character limits by tier
CHARACTER_LIMITS = {
'free': 1,
'basic': 3,
'premium': 5,
'elite': 10
}
class CharacterLimitExceeded(Exception):
"""Raised when user tries to create more characters than their tier allows."""
pass
class CharacterNotFound(Exception):
"""Raised when character ID doesn't exist or user doesn't own it."""
pass
class SkillUnlockError(Exception):
"""Raised when skill unlock validation fails."""
pass
class InsufficientGold(Exception):
"""Raised when character doesn't have enough gold for an operation."""
pass
class CharacterService:
"""
Service for managing player characters.
This service provides:
- Character creation with tier limit enforcement
- Character retrieval (single and list)
- Character deletion
- Skill unlock/respec functionality
"""
def __init__(self):
"""Initialize the character service with dependencies."""
self.db = get_database_service()
self.appwrite = AppwriteService()
self.class_loader = get_class_loader()
self.origin_service = get_origin_service()
self.collection_id = "characters"
logger.info("CharacterService initialized")
def create_character(
self,
user_id: str,
name: str,
class_id: str,
origin_id: str
) -> Character:
"""
Create a new character for a user.
This method:
1. Validates user hasn't exceeded tier character limit
2. Loads class and origin data
3. Creates character with default starting state
4. Stores in Appwrite database
Args:
user_id: Owner's user ID (from Appwrite auth)
name: Character name
class_id: PlayerClass ID (e.g., "warrior", "arcanist")
origin_id: Origin ID (e.g., "soul_revenant")
Returns:
Created Character instance
Raises:
CharacterLimitExceeded: If user has reached their tier limit
ValueError: If class or origin not found
AppwriteException: If database operation fails
"""
try:
logger.info("Creating character",
user_id=user_id,
name=name,
class_id=class_id,
origin_id=origin_id)
# Check character limit for user's tier
tier = self.appwrite.get_user_tier(user_id)
current_count = self.count_user_characters(user_id)
limit = CHARACTER_LIMITS.get(tier, 1)
if current_count >= limit:
logger.warning("Character limit exceeded",
user_id=user_id,
tier=tier,
current=current_count,
limit=limit)
raise CharacterLimitExceeded(
f"Character limit reached for {tier} tier ({current_count}/{limit}). "
f"Upgrade your subscription to create more characters."
)
# Load class and origin data
player_class = self.class_loader.load_class(class_id)
if not player_class:
raise ValueError(f"Class not found: {class_id}")
origin = self.origin_service.load_origin(origin_id)
if not origin:
raise ValueError(f"Origin not found: {origin_id}")
# Generate unique character ID
character_id = ID.unique()
# Determine starting location - use location system if available
from app.services.location_loader import get_location_loader
location_loader = get_location_loader()
starting_locations = location_loader.get_starting_locations()
if starting_locations:
# Use first starting location from location data (crossville_village)
start_loc = starting_locations[0]
starting_location_id = start_loc.location_id
else:
# Fallback to origin's starting location
starting_location_id = origin.starting_location.id
# Create character instance with starting state
character = Character(
character_id=character_id,
user_id=user_id,
name=name,
player_class=player_class,
origin=origin,
level=1,
experience=0,
base_stats=player_class.base_stats.copy(),
unlocked_skills=[],
inventory=[],
equipped={},
gold=0,
active_quests=[],
discovered_locations=[starting_location_id], # Initialize with starting location
current_location=starting_location_id # Set starting location
)
# Serialize character to JSON
character_dict = character.to_dict()
character_json = json.dumps(character_dict)
# Store in database
document_data = {
'userId': user_id,
'characterData': character_json,
'is_active': True
}
self.db.create_document(
collection_id=self.collection_id,
data=document_data,
document_id=character_id
)
logger.info("Character created successfully",
character_id=character_id,
user_id=user_id,
class_id=class_id)
return character
except CharacterLimitExceeded:
raise
except Exception as e:
logger.error("Failed to create character",
user_id=user_id,
error=str(e))
raise
def get_character(self, character_id: str, user_id: str) -> Optional[Character]:
"""
Get a character by ID.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
Returns:
Character instance or None if not found
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
logger.debug("Fetching character", character_id=character_id, user_id=user_id)
# Get document from database
document = self.db.get_row(self.collection_id, character_id)
if not document:
logger.warning("Character not found", character_id=character_id)
raise CharacterNotFound(f"Character not found: {character_id}")
# Verify ownership
if document.data.get('userId') != user_id:
logger.warning("Character ownership mismatch",
character_id=character_id,
expected_user=user_id,
actual_user=document.data.get('userId'))
raise CharacterNotFound(f"Character not found: {character_id}")
# Parse character data
character_json = document.data.get('characterData')
character_dict = json.loads(character_json)
character = Character.from_dict(character_dict)
logger.debug("Character fetched successfully", character_id=character_id)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to fetch character",
character_id=character_id,
error=str(e))
raise
def get_user_characters(self, user_id: str) -> List[Character]:
"""
Get all characters owned by a user.
Args:
user_id: User ID
Returns:
List of Character instances (may be empty)
"""
try:
logger.debug("Fetching user characters", user_id=user_id)
# Query for active characters owned by user
queries = [
Query.equal('userId', user_id),
Query.equal('is_active', True)
]
documents = self.db.list_rows(
table_id=self.collection_id,
queries=queries,
limit=100 # Max characters per user is 10 (elite tier)
)
# Parse all character data
characters = []
for document in documents:
try:
character_json = document.data.get('characterData')
character_dict = json.loads(character_json)
character = Character.from_dict(character_dict)
characters.append(character)
except Exception as e:
logger.error("Failed to parse character",
document_id=document.id,
error=str(e))
continue
logger.debug("User characters fetched",
user_id=user_id,
count=len(characters))
return characters
except Exception as e:
logger.error("Failed to fetch user characters",
user_id=user_id,
error=str(e))
raise
def count_user_characters(self, user_id: str) -> int:
"""
Count active characters owned by a user.
Args:
user_id: User ID
Returns:
Number of active characters
"""
try:
queries = [
Query.equal('userId', user_id),
Query.equal('is_active', True)
]
count = self.db.count_documents(
collection_id=self.collection_id,
queries=queries
)
logger.debug("Character count", user_id=user_id, count=count)
return count
except Exception as e:
logger.error("Failed to count characters", user_id=user_id, error=str(e))
return 0
def delete_character(self, character_id: str, user_id: str) -> bool:
"""
Delete a character (soft delete by marking inactive).
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
Returns:
True if deletion successful
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
logger.info("Deleting character", character_id=character_id, user_id=user_id)
# Verify ownership first
character = self.get_character(character_id, user_id)
if not character:
raise CharacterNotFound(f"Character not found: {character_id}")
# Soft delete by marking inactive
self.db.update_document(
collection_id=self.collection_id,
document_id=character_id,
data={'is_active': False}
)
logger.info("Character deleted successfully", character_id=character_id)
return True
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to delete character",
character_id=character_id,
error=str(e))
raise
def unlock_skill(self, character_id: str, user_id: str, skill_id: str) -> Character:
"""
Unlock a skill for a character.
This method:
1. Validates user owns the character
2. Validates skill exists in character's class
3. Validates prerequisites are met
4. Validates character has skill points available
5. Unlocks the skill
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
skill_id: Skill ID to unlock
Returns:
Updated Character instance
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
SkillUnlockError: If skill unlock validation fails
"""
try:
logger.info("Unlocking skill",
character_id=character_id,
skill_id=skill_id)
# Get character
character = self.get_character(character_id, user_id)
# Check if skill already unlocked
if skill_id in character.unlocked_skills:
raise SkillUnlockError(f"Skill already unlocked: {skill_id}")
# Get skill node from class
all_skills = character.player_class.get_all_skills()
skill_node = next((s for s in all_skills if s.skill_id == skill_id), None)
if not skill_node:
raise SkillUnlockError(f"Skill not found in class: {skill_id}")
# Check prerequisites
if skill_node.prerequisites:
for prereq in skill_node.prerequisites:
if prereq not in character.unlocked_skills:
raise SkillUnlockError(
f"Prerequisite not met: {prereq} required for {skill_id}"
)
# Calculate available skill points (1 per level, minus already unlocked)
available_points = character.level - len(character.unlocked_skills)
if available_points <= 0:
raise SkillUnlockError(
f"No skill points available (Level {character.level}, "
f"{len(character.unlocked_skills)} skills unlocked)"
)
# Unlock skill
character.unlocked_skills.append(skill_id)
# Save to database
self._save_character(character)
logger.info("Skill unlocked successfully",
character_id=character_id,
skill_id=skill_id)
return character
except (CharacterNotFound, SkillUnlockError):
raise
except Exception as e:
logger.error("Failed to unlock skill",
character_id=character_id,
skill_id=skill_id,
error=str(e))
raise
def respec_skills(self, character_id: str, user_id: str) -> Character:
"""
Reset all unlocked skills for a character.
Cost: level × 100 gold
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
Returns:
Updated Character instance
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
InsufficientGold: If character can't afford respec
"""
try:
logger.info("Respecing character skills", character_id=character_id)
# Get character
character = self.get_character(character_id, user_id)
# Calculate cost
respec_cost = character.level * 100
# Check gold
if character.gold < respec_cost:
raise InsufficientGold(
f"Insufficient gold for respec. Cost: {respec_cost}, Available: {character.gold}"
)
# Deduct gold
character.gold -= respec_cost
# Clear all unlocked skills
character.unlocked_skills = []
# Save to database
self._save_character(character)
logger.info("Skills respeced successfully",
character_id=character_id,
cost=respec_cost)
return character
except (CharacterNotFound, InsufficientGold):
raise
except Exception as e:
logger.error("Failed to respec skills",
character_id=character_id,
error=str(e))
raise
def update_character(self, character: Character, user_id: str) -> Character:
"""
Update a character's data.
Args:
character: Character instance with updated data
user_id: User ID (for ownership validation)
Returns:
Updated Character instance
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
logger.info("Updating character", character_id=character.character_id)
# Verify ownership
existing = self.get_character(character.character_id, user_id)
if not existing:
raise CharacterNotFound(f"Character not found: {character.character_id}")
# Save to database
self._save_character(character)
logger.info("Character updated successfully", character_id=character.character_id)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to update character",
character_id=character.character_id,
error=str(e))
raise
# ==================== Location and NPC Tracking ====================
def unlock_location(
self,
character_id: str,
user_id: str,
location_id: str
) -> Character:
"""
Add a location to character's discovered_locations.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
location_id: Location ID to unlock
Returns:
Updated Character instance
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
logger.info("Unlocking location",
character_id=character_id,
location_id=location_id)
character = self.get_character(character_id, user_id)
if location_id not in character.discovered_locations:
character.discovered_locations.append(location_id)
self._save_character(character)
logger.info("Location unlocked",
character_id=character_id,
location_id=location_id)
else:
logger.debug("Location already unlocked",
character_id=character_id,
location_id=location_id)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to unlock location",
character_id=character_id,
location_id=location_id,
error=str(e))
raise
def update_npc_interaction(
self,
character_id: str,
user_id: str,
npc_id: str,
interaction_data: Dict[str, Any]
) -> Character:
"""
Update NPC interaction state on character record.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID to update interaction for
interaction_data: Dict containing interaction state fields
Returns:
Updated Character instance
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
logger.info("Updating NPC interaction",
character_id=character_id,
npc_id=npc_id)
character = self.get_character(character_id, user_id)
character.npc_interactions[npc_id] = interaction_data
self._save_character(character)
logger.info("NPC interaction updated",
character_id=character_id,
npc_id=npc_id,
interaction_count=interaction_data.get('interaction_count', 0))
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to update NPC interaction",
character_id=character_id,
npc_id=npc_id,
error=str(e))
raise
def get_npc_interaction(
self,
character_id: str,
user_id: str,
npc_id: str
) -> Optional[Dict[str, Any]]:
"""
Get interaction state for a specific NPC.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID to get interaction for
Returns:
Interaction state dict or None if no interactions yet
Raises:
CharacterNotFound: If character doesn't exist or user doesn't own it
"""
try:
character = self.get_character(character_id, user_id)
return character.npc_interactions.get(npc_id)
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to get NPC interaction",
character_id=character_id,
npc_id=npc_id,
error=str(e))
raise
def check_npc_secret_conditions(
self,
character: Character,
npc: Any # NPC type from models.npc
) -> List[str]:
"""
Check which secrets the NPC would reveal based on character state.
Evaluates the NPC's will_share_if conditions against the character's
interaction state and returns a list of secrets that should be revealed.
Args:
character: Character instance
npc: NPC instance with knowledge and will_share_if conditions
Returns:
List of secret strings that should be revealed this conversation
"""
if not npc.knowledge or not npc.knowledge.will_share_if:
return []
interaction = character.npc_interactions.get(npc.npc_id, {})
revealed_indices = interaction.get("revealed_secrets", [])
reveals = []
for i, condition in enumerate(npc.knowledge.will_share_if):
# Skip already revealed secrets
if i in revealed_indices:
continue
# Evaluate condition
if self._evaluate_npc_condition(condition.condition, interaction):
reveals.append(condition.reveals)
logger.debug("Secret condition met",
npc_id=npc.npc_id,
condition_index=i,
condition=condition.condition)
return reveals
def _evaluate_npc_condition(
self,
condition: str,
interaction: Dict[str, Any]
) -> bool:
"""
Evaluate a condition string against interaction state.
Supports simple condition patterns:
- "interaction_count >= N"
- "relationship_level >= N"
- "custom_flags.key == true/false"
Args:
condition: Condition string to evaluate
interaction: Character's interaction state with this NPC
Returns:
True if condition is met, False otherwise
"""
try:
condition = condition.strip()
# Pattern: interaction_count >= N
if "interaction_count" in condition:
if ">=" in condition:
required = int(condition.split(">=")[1].strip())
return interaction.get("interaction_count", 0) >= required
elif ">" in condition:
required = int(condition.split(">")[1].strip())
return interaction.get("interaction_count", 0) > required
# Pattern: relationship_level >= N
if "relationship_level" in condition:
if ">=" in condition:
required = int(condition.split(">=")[1].strip())
return interaction.get("relationship_level", 50) >= required
elif ">" in condition:
required = int(condition.split(">")[1].strip())
return interaction.get("relationship_level", 50) > required
# Pattern: custom_flags.key == true/false
if "custom_flags." in condition:
if "==" in condition:
parts = condition.split("==")
flag_path = parts[0].strip().replace("custom_flags.", "")
expected_str = parts[1].strip().lower()
expected = expected_str == "true"
flags = interaction.get("custom_flags", {})
return flags.get(flag_path) == expected
# Unknown condition pattern - log warning and return False
logger.warning("Unknown condition pattern", condition=condition)
return False
except Exception as e:
logger.error("Failed to evaluate condition",
condition=condition,
error=str(e))
return False
def mark_secret_revealed(
self,
character_id: str,
user_id: str,
npc_id: str,
secret_index: int
) -> Character:
"""
Mark a secret as revealed so it won't be revealed again.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID
secret_index: Index of the secret in will_share_if list
Returns:
Updated Character instance
"""
try:
character = self.get_character(character_id, user_id)
interaction = character.npc_interactions.get(npc_id, {})
revealed = interaction.get("revealed_secrets", [])
if secret_index not in revealed:
revealed.append(secret_index)
interaction["revealed_secrets"] = revealed
character.npc_interactions[npc_id] = interaction
self._save_character(character)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to mark secret revealed",
character_id=character_id,
npc_id=npc_id,
secret_index=secret_index,
error=str(e))
raise
def set_npc_custom_flag(
self,
character_id: str,
user_id: str,
npc_id: str,
flag_name: str,
flag_value: Any
) -> Character:
"""
Set a custom flag on an NPC interaction (e.g., "helped_with_rats": true).
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID
flag_name: Name of the flag
flag_value: Value to set
Returns:
Updated Character instance
"""
try:
character = self.get_character(character_id, user_id)
interaction = character.npc_interactions.get(npc_id, {})
custom_flags = interaction.get("custom_flags", {})
custom_flags[flag_name] = flag_value
interaction["custom_flags"] = custom_flags
character.npc_interactions[npc_id] = interaction
self._save_character(character)
logger.info("NPC custom flag set",
character_id=character_id,
npc_id=npc_id,
flag_name=flag_name)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to set NPC custom flag",
character_id=character_id,
npc_id=npc_id,
flag_name=flag_name,
error=str(e))
raise
def adjust_npc_relationship(
self,
character_id: str,
user_id: str,
npc_id: str,
adjustment: int
) -> Character:
"""
Adjust relationship level with an NPC.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID
adjustment: Amount to add/subtract from relationship (can be negative)
Returns:
Updated Character instance
"""
try:
character = self.get_character(character_id, user_id)
interaction = character.npc_interactions.get(npc_id, {})
current_level = interaction.get("relationship_level", 50)
new_level = max(0, min(100, current_level + adjustment)) # Clamp 0-100
interaction["relationship_level"] = new_level
character.npc_interactions[npc_id] = interaction
self._save_character(character)
logger.info("NPC relationship adjusted",
character_id=character_id,
npc_id=npc_id,
old_level=current_level,
new_level=new_level)
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to adjust NPC relationship",
character_id=character_id,
npc_id=npc_id,
error=str(e))
raise
def add_npc_dialogue_exchange(
self,
character_id: str,
user_id: str,
npc_id: str,
player_line: str,
npc_response: str,
max_history: int = 10
) -> Character:
"""
Add a dialogue exchange to the NPC conversation history.
Stores the player's message and NPC's response for context in future
conversations. History is capped at max_history entries per NPC.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID
player_line: What the player said
npc_response: What the NPC responded
max_history: Maximum number of exchanges to keep per NPC (default 10)
Returns:
Updated Character instance
"""
try:
character = self.get_character(character_id, user_id)
interaction = character.npc_interactions.get(npc_id, {})
dialogue_history = interaction.get("dialogue_history", [])
# Add the new exchange
exchange = {
"player_line": player_line,
"npc_response": npc_response
}
dialogue_history.append(exchange)
# Trim to max_history (keep most recent)
if len(dialogue_history) > max_history:
dialogue_history = dialogue_history[-max_history:]
interaction["dialogue_history"] = dialogue_history
character.npc_interactions[npc_id] = interaction
self._save_character(character)
logger.debug("NPC dialogue exchange added",
character_id=character_id,
npc_id=npc_id,
history_length=len(dialogue_history))
return character
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to add NPC dialogue exchange",
character_id=character_id,
npc_id=npc_id,
error=str(e))
raise
def get_npc_dialogue_history(
self,
character_id: str,
user_id: str,
npc_id: str,
limit: int = 5
) -> List[Dict[str, str]]:
"""
Get recent dialogue history with an NPC.
Args:
character_id: Character ID
user_id: User ID (for ownership validation)
npc_id: NPC ID
limit: Maximum number of recent exchanges to return (default 5)
Returns:
List of dialogue exchanges [{player_line: str, npc_response: str}, ...]
"""
try:
character = self.get_character(character_id, user_id)
interaction = character.npc_interactions.get(npc_id, {})
dialogue_history = interaction.get("dialogue_history", [])
# Return most recent exchanges (up to limit)
return dialogue_history[-limit:] if dialogue_history else []
except CharacterNotFound:
raise
except Exception as e:
logger.error("Failed to get NPC dialogue history",
character_id=character_id,
npc_id=npc_id,
error=str(e))
raise
# ==================== End Location and NPC Tracking ====================
def _save_character(self, character: Character) -> None:
"""
Internal method to save character to database.
Args:
character: Character instance to save
"""
# Serialize character to JSON
character_dict = character.to_dict()
character_json = json.dumps(character_dict)
# Update in database
self.db.update_document(
collection_id=self.collection_id,
document_id=character.character_id,
data={'characterData': character_json}
)
# Global instance for convenience
_service_instance: Optional[CharacterService] = None
def get_character_service() -> CharacterService:
"""
Get the global CharacterService instance.
Returns:
Singleton CharacterService instance
"""
global _service_instance
if _service_instance is None:
_service_instance = CharacterService()
return _service_instance