""" Session Service - CRUD operations for game sessions. This service handles creating, reading, updating, and managing game sessions, with support for both solo and multiplayer sessions. """ import json from typing import List, Optional from datetime import datetime, timezone from appwrite.query import Query from appwrite.id import ID from app.models.session import GameSession, GameState, ConversationEntry, SessionConfig from app.models.enums import SessionStatus, SessionType from app.models.action_prompt import LocationType from app.services.database_service import get_database_service from app.services.appwrite_service import AppwriteService from app.services.character_service import get_character_service, CharacterNotFound from app.services.location_loader import get_location_loader from app.services.chat_message_service import get_chat_message_service from app.utils.logging import get_logger from app.config import get_config logger = get_logger(__file__) class SessionNotFound(Exception): """Raised when session ID doesn't exist or user doesn't own it.""" pass class SessionLimitExceeded(Exception): """Raised when user tries to create more sessions than allowed.""" pass class SessionValidationError(Exception): """Raised when session validation fails.""" pass class SessionService: """ Service for managing game sessions. This service provides: - Session creation (solo and multiplayer) - Session retrieval and listing - Session state updates - Conversation history management - Game state tracking """ def __init__(self): """Initialize the session service with dependencies.""" self.db = get_database_service() self.appwrite = AppwriteService() self.character_service = get_character_service() self.collection_id = "game_sessions" logger.info("SessionService initialized") def create_solo_session( self, user_id: str, character_id: str, starting_location: Optional[str] = None, starting_location_type: Optional[LocationType] = None ) -> GameSession: """ Create a new solo game session. This method: 1. Validates user owns the character 2. Validates user hasn't exceeded session limit 3. Determines starting location from location data 4. Creates session with initial game state 5. Stores in Appwrite database Args: user_id: Owner's user ID character_id: Character ID for this session starting_location: Initial location ID (optional, uses default starting location) starting_location_type: Initial location type (optional, derived from location data) Returns: Created GameSession instance Raises: CharacterNotFound: If character doesn't exist or user doesn't own it SessionLimitExceeded: If user has reached active session limit """ try: logger.info("Creating solo session", user_id=user_id, character_id=character_id) # Validate user owns the character character = self.character_service.get_character(character_id, user_id) if not character: raise CharacterNotFound(f"Character not found: {character_id}") # Determine starting location from location data if not provided if not starting_location: location_loader = get_location_loader() starting_locations = location_loader.get_starting_locations() if starting_locations: # Use first starting location (usually crossville_village) start_loc = starting_locations[0] starting_location = start_loc.location_id # Convert from enums.LocationType to action_prompt.LocationType via string value starting_location_type = LocationType(start_loc.location_type.value) logger.info("Using starting location from data", location_id=starting_location, location_type=starting_location_type.value) else: # Fallback to crossville_village starting_location = "crossville_village" starting_location_type = LocationType.TOWN logger.warning("No starting locations found, using fallback", location_id=starting_location) # Ensure location type is set if not starting_location_type: starting_location_type = LocationType.TOWN # Check session limit based on user's subscription tier user_tier = self.appwrite.get_user_tier(user_id) config = get_config() tier_config = config.rate_limiting.tiers.get(user_tier) max_sessions = tier_config.max_sessions if tier_config else 1 active_count = self.count_user_sessions(user_id, active_only=True) if active_count >= max_sessions: logger.warning("Session limit exceeded", user_id=user_id, tier=user_tier, current=active_count, limit=max_sessions) raise SessionLimitExceeded( f"Maximum active sessions reached for {user_tier} tier ({active_count}/{max_sessions}). " f"Please delete an existing session to start a new one." ) # Generate unique session ID session_id = ID.unique() # Create game state with starting location game_state = GameState( current_location=starting_location, location_type=starting_location_type, discovered_locations=[starting_location], active_quests=[], world_events=[] ) # Create session instance session = GameSession( session_id=session_id, session_type=SessionType.SOLO, solo_character_id=character_id, user_id=user_id, party_member_ids=[], config=SessionConfig(), game_state=game_state, turn_order=[character_id], current_turn=0, turn_number=0, status=SessionStatus.ACTIVE ) # Serialize and store session_dict = session.to_dict() session_json = json.dumps(session_dict) document_data = { 'userId': user_id, 'characterId': character_id, 'sessionData': session_json, 'status': SessionStatus.ACTIVE.value, 'sessionType': SessionType.SOLO.value } self.db.create_document( collection_id=self.collection_id, data=document_data, document_id=session_id ) logger.info("Solo session created successfully", session_id=session_id, user_id=user_id, character_id=character_id) return session except (CharacterNotFound, SessionLimitExceeded): raise except Exception as e: logger.error("Failed to create solo session", user_id=user_id, character_id=character_id, error=str(e)) raise def get_session(self, session_id: str, user_id: Optional[str] = None) -> GameSession: """ Get a session by ID. Args: session_id: Session ID user_id: Optional user ID for ownership validation Returns: GameSession instance Raises: SessionNotFound: If session doesn't exist or user doesn't own it """ try: logger.debug("Fetching session", session_id=session_id) # Get document from database document = self.db.get_row(self.collection_id, session_id) if not document: logger.warning("Session not found", session_id=session_id) raise SessionNotFound(f"Session not found: {session_id}") # Verify ownership if user_id provided if user_id and document.data.get('userId') != user_id: logger.warning("Session ownership mismatch", session_id=session_id, expected_user=user_id, actual_user=document.data.get('userId')) raise SessionNotFound(f"Session not found: {session_id}") # Parse session data session_json = document.data.get('sessionData') session_dict = json.loads(session_json) session = GameSession.from_dict(session_dict) logger.debug("Session fetched successfully", session_id=session_id) return session except SessionNotFound: raise except Exception as e: logger.error("Failed to fetch session", session_id=session_id, error=str(e)) raise def update_session(self, session: GameSession) -> GameSession: """ Update a session in the database. Args: session: GameSession instance with updated data Returns: Updated GameSession instance """ try: logger.debug("Updating session", session_id=session.session_id) # Serialize session session_dict = session.to_dict() session_json = json.dumps(session_dict) # Update in database self.db.update_document( collection_id=self.collection_id, document_id=session.session_id, data={ 'sessionData': session_json, 'status': session.status.value } ) logger.debug("Session updated successfully", session_id=session.session_id) return session except Exception as e: logger.error("Failed to update session", session_id=session.session_id, error=str(e)) raise def get_user_sessions( self, user_id: str, active_only: bool = True, limit: int = 25 ) -> List[GameSession]: """ Get all sessions for a user. Args: user_id: User ID active_only: If True, only return active sessions limit: Maximum number of sessions to return Returns: List of GameSession instances """ try: logger.debug("Fetching user sessions", user_id=user_id, active_only=active_only) # Build query queries = [Query.equal('userId', user_id)] if active_only: queries.append(Query.equal('status', SessionStatus.ACTIVE.value)) documents = self.db.list_rows( table_id=self.collection_id, queries=queries, limit=limit ) # Parse all sessions sessions = [] for document in documents: try: session_json = document.data.get('sessionData') session_dict = json.loads(session_json) session = GameSession.from_dict(session_dict) sessions.append(session) except Exception as e: logger.error("Failed to parse session", document_id=document.id, error=str(e)) continue logger.debug("User sessions fetched", user_id=user_id, count=len(sessions)) return sessions except Exception as e: logger.error("Failed to fetch user sessions", user_id=user_id, error=str(e)) raise def count_user_sessions(self, user_id: str, active_only: bool = True) -> int: """ Count sessions for a user. Args: user_id: User ID active_only: If True, only count active sessions Returns: Number of sessions """ try: queries = [Query.equal('userId', user_id)] if active_only: queries.append(Query.equal('status', SessionStatus.ACTIVE.value)) count = self.db.count_documents( collection_id=self.collection_id, queries=queries ) logger.debug("Session count", user_id=user_id, active_only=active_only, count=count) return count except Exception as e: logger.error("Failed to count sessions", user_id=user_id, error=str(e)) return 0 def end_session(self, session_id: str, user_id: str) -> GameSession: """ End a session by marking it as completed. Args: session_id: Session ID user_id: User ID for ownership validation Returns: Updated GameSession instance Raises: SessionNotFound: If session doesn't exist or user doesn't own it """ try: logger.info("Ending session", session_id=session_id, user_id=user_id) session = self.get_session(session_id, user_id) session.status = SessionStatus.COMPLETED session.update_activity() return self.update_session(session) except SessionNotFound: raise except Exception as e: logger.error("Failed to end session", session_id=session_id, error=str(e)) raise def delete_session(self, session_id: str, user_id: str) -> bool: """ Permanently delete a session from the database. Unlike end_session(), this method removes the session document entirely from the database. Use this when the user wants to free up their session slot and doesn't need to preserve the game history. Also deletes all chat messages associated with this session. Args: session_id: Session ID to delete user_id: User ID for ownership validation Returns: True if deleted successfully Raises: SessionNotFound: If session doesn't exist or user doesn't own it """ try: logger.info("Deleting session", session_id=session_id, user_id=user_id) # Verify ownership first (raises SessionNotFound if invalid) self.get_session(session_id, user_id) # Delete associated chat messages first chat_service = get_chat_message_service() deleted_messages = chat_service.delete_messages_by_session(session_id) logger.info("Deleted associated chat messages", session_id=session_id, message_count=deleted_messages) # Delete session from database self.db.delete_document( collection_id=self.collection_id, document_id=session_id ) logger.info("Session deleted successfully", session_id=session_id, user_id=user_id) return True except SessionNotFound: raise except Exception as e: logger.error("Failed to delete session", session_id=session_id, error=str(e)) raise def add_conversation_entry( self, session_id: str, character_id: str, character_name: str, action: str, dm_response: str, combat_log: Optional[List] = None, quest_offered: Optional[dict] = None ) -> GameSession: """ Add an entry to the conversation history. This method automatically: - Increments turn number - Adds timestamp - Updates last activity Args: session_id: Session ID character_id: Acting character's ID character_name: Acting character's name action: Player's action text dm_response: AI DM's response combat_log: Optional combat actions quest_offered: Optional quest offering info Returns: Updated GameSession instance """ try: logger.debug("Adding conversation entry", session_id=session_id, character_id=character_id) session = self.get_session(session_id) # Create conversation entry entry = ConversationEntry( turn=session.turn_number + 1, character_id=character_id, character_name=character_name, action=action, dm_response=dm_response, combat_log=combat_log or [], quest_offered=quest_offered ) # Add entry and increment turn session.conversation_history.append(entry) session.turn_number += 1 session.update_activity() # Save to database return self.update_session(session) except Exception as e: logger.error("Failed to add conversation entry", session_id=session_id, error=str(e)) raise def get_conversation_history( self, session_id: str, limit: Optional[int] = None, offset: int = 0 ) -> List[ConversationEntry]: """ Get conversation history for a session. Args: session_id: Session ID limit: Maximum entries to return (None for all) offset: Number of entries to skip from end Returns: List of ConversationEntry instances """ try: session = self.get_session(session_id) history = session.conversation_history # Apply offset (from end) if offset > 0: history = history[:-offset] if offset < len(history) else [] # Apply limit (from end) if limit and len(history) > limit: history = history[-limit:] return history except Exception as e: logger.error("Failed to get conversation history", session_id=session_id, error=str(e)) raise def get_recent_history(self, session_id: str, num_turns: int = 3) -> List[ConversationEntry]: """ Get the most recent conversation entries for AI context. Args: session_id: Session ID num_turns: Number of recent turns to return Returns: List of most recent ConversationEntry instances """ return self.get_conversation_history(session_id, limit=num_turns) def update_location( self, session_id: str, new_location: str, location_type: LocationType ) -> GameSession: """ Update the current location in the session. Also adds location to discovered_locations if not already there. Args: session_id: Session ID new_location: New location name location_type: New location type Returns: Updated GameSession instance """ try: logger.debug("Updating location", session_id=session_id, new_location=new_location) session = self.get_session(session_id) session.game_state.current_location = new_location session.game_state.location_type = location_type # Track discovered locations if new_location not in session.game_state.discovered_locations: session.game_state.discovered_locations.append(new_location) session.update_activity() return self.update_session(session) except Exception as e: logger.error("Failed to update location", session_id=session_id, error=str(e)) raise def add_discovered_location(self, session_id: str, location: str) -> GameSession: """ Add a location to the discovered locations list. Args: session_id: Session ID location: Location name to add Returns: Updated GameSession instance """ try: session = self.get_session(session_id) if location not in session.game_state.discovered_locations: session.game_state.discovered_locations.append(location) session.update_activity() return self.update_session(session) return session except Exception as e: logger.error("Failed to add discovered location", session_id=session_id, error=str(e)) raise def add_active_quest(self, session_id: str, quest_id: str) -> GameSession: """ Add a quest to the active quests list. Validates max 2 active quests limit. Args: session_id: Session ID quest_id: Quest ID to add Returns: Updated GameSession instance Raises: SessionValidationError: If max quests limit exceeded """ try: session = self.get_session(session_id) # Check max active quests (2) if len(session.game_state.active_quests) >= 2: raise SessionValidationError( "Maximum active quests reached (2/2). " "Complete or abandon a quest to accept a new one." ) if quest_id not in session.game_state.active_quests: session.game_state.active_quests.append(quest_id) session.update_activity() return self.update_session(session) return session except SessionValidationError: raise except Exception as e: logger.error("Failed to add active quest", session_id=session_id, quest_id=quest_id, error=str(e)) raise def remove_active_quest(self, session_id: str, quest_id: str) -> GameSession: """ Remove a quest from the active quests list. Args: session_id: Session ID quest_id: Quest ID to remove Returns: Updated GameSession instance """ try: session = self.get_session(session_id) if quest_id in session.game_state.active_quests: session.game_state.active_quests.remove(quest_id) session.update_activity() return self.update_session(session) return session except Exception as e: logger.error("Failed to remove active quest", session_id=session_id, quest_id=quest_id, error=str(e)) raise def add_world_event(self, session_id: str, event: dict) -> GameSession: """ Add a world event to the session. Args: session_id: Session ID event: Event dictionary with type, description, etc. Returns: Updated GameSession instance """ try: session = self.get_session(session_id) # Add timestamp if not present if 'timestamp' not in event: event['timestamp'] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") session.game_state.world_events.append(event) session.update_activity() return self.update_session(session) except Exception as e: logger.error("Failed to add world event", session_id=session_id, error=str(e)) raise # Global instance for convenience _service_instance: Optional[SessionService] = None def get_session_service() -> SessionService: """ Get the global SessionService instance. Returns: Singleton SessionService instance """ global _service_instance if _service_instance is None: _service_instance = SessionService() return _service_instance