Files
Code_of_Conquest/api/app/api/sessions.py

736 lines
24 KiB
Python

"""
Sessions API Blueprint
This module provides API endpoints for story session management:
- Create new solo session
- Get session state
- Take action (async AI processing)
- Get conversation history
All endpoints require authentication and enforce ownership validation.
"""
from flask import Blueprint, request, g
from app.services.session_service import (
get_session_service,
SessionNotFound,
SessionLimitExceeded,
SessionValidationError
)
from app.services.character_service import CharacterNotFound, get_character_service
from app.services.rate_limiter_service import RateLimiterService, RateLimitExceeded
from app.services.action_prompt_loader import ActionPromptLoader, ActionPromptNotFoundError
from app.services.outcome_service import outcome_service
from app.tasks.ai_tasks import enqueue_ai_task, TaskType, get_job_status, get_job_result
from app.ai.model_selector import UserTier
from app.models.action_prompt import LocationType
from app.game_logic.dice import SkillType
from app.utils.response import (
success_response,
created_response,
accepted_response,
error_response,
not_found_response,
validation_error_response,
rate_limit_exceeded_response
)
from app.utils.auth import require_auth, get_current_user
from app.utils.logging import get_logger
from app.config import get_config
# Initialize logger
logger = get_logger(__file__)
# Create blueprint
sessions_bp = Blueprint('sessions', __name__)
# ===== VALIDATION FUNCTIONS =====
def validate_character_id(character_id: str) -> tuple[bool, str]:
"""
Validate character ID format.
Args:
character_id: Character ID to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not character_id:
return False, "Character ID is required"
if not isinstance(character_id, str):
return False, "Character ID must be a string"
if len(character_id) > 100:
return False, "Character ID is too long"
return True, ""
def validate_action_request(data: dict, user_tier: UserTier) -> tuple[bool, str]:
"""
Validate action request data.
Args:
data: Request JSON data
user_tier: User's subscription tier
Returns:
Tuple of (is_valid, error_message)
"""
action_type = data.get('action_type')
if action_type != 'button':
return False, "action_type must be 'button'"
if not data.get('prompt_id'):
return False, "prompt_id is required for button actions"
return True, ""
def get_user_tier_from_user(user) -> UserTier:
"""
Get UserTier enum from user object.
Args:
user: User object from auth
Returns:
UserTier enum value
"""
# Map user tier string to UserTier enum
tier_mapping = {
'free': UserTier.FREE,
'basic': UserTier.BASIC,
'premium': UserTier.PREMIUM,
'elite': UserTier.ELITE
}
user_tier_str = getattr(user, 'tier', 'free').lower()
return tier_mapping.get(user_tier_str, UserTier.FREE)
# ===== API ENDPOINTS =====
@sessions_bp.route('/api/v1/sessions', methods=['GET'])
@require_auth
def list_sessions():
"""
List user's active game sessions.
Returns all active sessions for the authenticated user with basic session info.
Returns:
JSON response with list of sessions
"""
try:
user = get_current_user()
user_id = user.id
session_service = get_session_service()
character_service = get_character_service()
# Get user's active sessions
sessions = session_service.get_user_sessions(user_id, active_only=True)
# Build character name lookup for efficiency
character_ids = [s.solo_character_id for s in sessions if s.solo_character_id]
character_names = {}
for char_id in character_ids:
try:
char = character_service.get_character(char_id, user_id)
if char:
character_names[char_id] = char.name
except Exception:
pass # Character may have been deleted
# Build response with basic session info
sessions_list = []
for session in sessions:
# Get combat round if in combat
combat_round = None
if session.is_in_combat() and session.combat_encounter:
combat_round = session.combat_encounter.round_number
sessions_list.append({
'session_id': session.session_id,
'character_id': session.solo_character_id,
'character_name': character_names.get(session.solo_character_id),
'turn_number': session.turn_number,
'status': session.status.value,
'created_at': session.created_at,
'last_activity': session.last_activity,
'in_combat': session.is_in_combat(),
'game_state': {
'current_location': session.game_state.current_location,
'location_type': session.game_state.location_type.value,
'in_combat': session.is_in_combat(),
'combat_round': combat_round
}
})
logger.info("Sessions listed successfully",
user_id=user_id,
count=len(sessions_list))
return success_response(sessions_list)
except Exception as e:
logger.error("Failed to list sessions", error=str(e))
return error_response(f"Failed to list sessions: {str(e)}", 500)
@sessions_bp.route('/api/v1/sessions', methods=['POST'])
@require_auth
def create_session():
"""
Create a new solo game session.
Request Body:
{
"character_id": "char_456"
}
Returns:
201: Session created with initial state
400: Validation error
401: Not authenticated
404: Character not found
409: Session limit exceeded
500: Internal server error
"""
logger.info("Creating new session")
try:
# Get current user
user = get_current_user()
user_id = user.id
# Parse and validate request
data = request.get_json()
if not data:
return validation_error_response("Request body is required")
character_id = data.get('character_id')
is_valid, error_msg = validate_character_id(character_id)
if not is_valid:
return validation_error_response(error_msg)
# Create session
session_service = get_session_service()
session = session_service.create_solo_session(
user_id=user_id,
character_id=character_id
)
logger.info("Session created successfully",
session_id=session.session_id,
user_id=user_id,
character_id=character_id)
# Return session data
return created_response({
"session_id": session.session_id,
"character_id": session.solo_character_id,
"turn_number": session.turn_number,
"game_state": {
"current_location": session.game_state.current_location,
"location_type": session.game_state.location_type.value,
"active_quests": session.game_state.active_quests
}
})
except CharacterNotFound as e:
logger.warning("Character not found for session creation",
error=str(e))
return not_found_response("Character not found")
except SessionLimitExceeded as e:
logger.warning("Session limit exceeded",
user_id=user_id if 'user_id' in locals() else 'unknown',
error=str(e))
return error_response(
status=409,
code="SESSION_LIMIT_EXCEEDED",
message=str(e)
)
except Exception as e:
logger.error("Failed to create session",
error=str(e),
exc_info=True)
return error_response(
status=500,
code="SESSION_CREATE_ERROR",
message="Failed to create session"
)
@sessions_bp.route('/api/v1/sessions/<session_id>/action', methods=['POST'])
@require_auth
def take_action(session_id: str):
"""
Submit an action for AI processing (async).
Request Body:
{
"action_type": "button",
"prompt_id": "ask_locals"
}
Returns:
202: Action queued for processing
400: Validation error
401: Not authenticated
403: Action not available for tier/location
404: Session not found
429: Rate limit exceeded
500: Internal server error
"""
logger.info("Processing action request", session_id=session_id)
try:
# Get current user
user = get_current_user()
user_id = user.id
user_tier = get_user_tier_from_user(user)
# Verify session ownership and get session
session_service = get_session_service()
session = session_service.get_session(session_id, user_id)
# Parse and validate request
data = request.get_json()
if not data:
return validation_error_response("Request body is required")
is_valid, error_msg = validate_action_request(data, user_tier)
if not is_valid:
return validation_error_response(error_msg)
# Check rate limit
rate_limiter = RateLimiterService()
try:
rate_limiter.check_rate_limit(user_id, user_tier)
except RateLimitExceeded as e:
logger.warning("Rate limit exceeded",
user_id=user_id,
tier=user_tier.value)
return rate_limit_exceeded_response(
message=f"Daily turn limit reached ({e.limit} turns). Resets at {e.reset_time.strftime('%H:%M UTC')}"
)
# Build action context for AI task
prompt_id = data.get('prompt_id')
# Validate prompt exists and is available
loader = ActionPromptLoader()
try:
action_prompt = loader.get_action_by_id(prompt_id)
except ActionPromptNotFoundError:
return validation_error_response(f"Invalid prompt_id: {prompt_id}")
# Check if action is available for user's tier and location
location_type = session.game_state.location_type
if not action_prompt.is_available(user_tier, location_type):
return error_response(
status=403,
code="ACTION_NOT_AVAILABLE",
message="This action is not available for your tier or location"
)
action_text = action_prompt.display_text
dm_prompt_template = action_prompt.dm_prompt_template
# Fetch character data for AI context
character_service = get_character_service()
character = character_service.get_character(session.solo_character_id, user_id)
if not character:
return not_found_response(f"Character {session.solo_character_id} not found")
# Perform dice check if action requires it
check_outcome = None
if action_prompt.requires_check:
check_req = action_prompt.requires_check
location_type_str = session.game_state.location_type.value if hasattr(session.game_state.location_type, 'value') else str(session.game_state.location_type)
# Get DC from difficulty
dc = outcome_service.get_dc_for_difficulty(check_req.difficulty)
if check_req.check_type == "search":
# Search check - uses perception and returns items/gold
outcome = outcome_service.determine_search_outcome(
character=character,
location_type=location_type_str,
dc=dc
)
check_outcome = outcome.to_dict()
logger.info(
"Search check performed",
character_id=character.character_id,
success=outcome.check_result.success,
items_found=len(outcome.items_found),
gold_found=outcome.gold_found
)
elif check_req.check_type == "skill" and check_req.skill:
# Skill check - generic skill vs DC
try:
skill_type = SkillType[check_req.skill.upper()]
outcome = outcome_service.determine_skill_check_outcome(
character=character,
skill_type=skill_type,
dc=dc
)
check_outcome = outcome.to_dict()
logger.info(
"Skill check performed",
character_id=character.character_id,
skill=check_req.skill,
success=outcome.check_result.success
)
except (KeyError, ValueError) as e:
logger.warning(
"Invalid skill type in action prompt",
prompt_id=action_prompt.prompt_id,
skill=check_req.skill,
error=str(e)
)
# Queue AI task
# Use trimmed character data for AI prompts (reduces tokens, focuses on story-relevant info)
task_context = {
"session_id": session_id,
"character_id": session.solo_character_id,
"action": action_text,
"prompt_id": prompt_id,
"dm_prompt_template": dm_prompt_template,
"character": character.to_story_dict(),
"game_state": session.game_state.to_dict(),
"turn_number": session.turn_number,
"conversation_history": [entry.to_dict() for entry in session.conversation_history],
"world_context": None, # TODO: Add world context source when available
"check_outcome": check_outcome # Dice check result for predetermined outcomes
}
result = enqueue_ai_task(
task_type=TaskType.NARRATIVE,
user_id=user_id,
context=task_context,
priority="normal",
session_id=session_id,
character_id=session.solo_character_id
)
# Increment rate limit counter
rate_limiter.increment_usage(user_id)
logger.info("Action queued for processing",
session_id=session_id,
job_id=result.get('job_id'),
prompt_id=prompt_id)
return accepted_response({
"job_id": result.get('job_id'),
"status": result.get('status', 'queued'),
"message": "Your action is being processed..."
})
except SessionNotFound as e:
logger.warning("Session not found for action",
session_id=session_id,
error=str(e))
return not_found_response("Session not found")
except Exception as e:
logger.error("Failed to process action",
session_id=session_id,
error=str(e),
exc_info=True)
return error_response(
status=500,
code="ACTION_PROCESS_ERROR",
message="Failed to process action"
)
@sessions_bp.route('/api/v1/sessions/<session_id>', methods=['GET'])
@require_auth
def get_session_state(session_id: str):
"""
Get current session state with available actions.
Returns:
200: Session state
401: Not authenticated
404: Session not found
500: Internal server error
"""
logger.info("Getting session state", session_id=session_id)
try:
# Get current user
user = get_current_user()
user_id = user.id
user_tier = get_user_tier_from_user(user)
# Get session
session_service = get_session_service()
session = session_service.get_session(session_id, user_id)
# Get available actions based on location and tier
loader = ActionPromptLoader()
location_type = session.game_state.location_type
available_actions = []
for action in loader.get_available_actions(user_tier, location_type):
available_actions.append({
"prompt_id": action.prompt_id,
"display_text": action.display_text,
"description": action.description,
"category": action.category.value
})
logger.debug("Session state retrieved",
session_id=session_id,
turn_number=session.turn_number)
return success_response({
"session_id": session.session_id,
"character_id": session.get_character_id(),
"turn_number": session.turn_number,
"status": session.status.value,
"in_combat": session.is_in_combat(),
"game_state": {
"current_location": session.game_state.current_location,
"location_type": session.game_state.location_type.value,
"active_quests": session.game_state.active_quests,
"in_combat": session.is_in_combat()
},
"available_actions": available_actions
})
except SessionNotFound as e:
logger.warning("Session not found",
session_id=session_id,
error=str(e))
return not_found_response("Session not found")
except Exception as e:
logger.error("Failed to get session state",
session_id=session_id,
error=str(e),
exc_info=True)
return error_response(
status=500,
code="SESSION_STATE_ERROR",
message="Failed to get session state"
)
@sessions_bp.route('/api/v1/sessions/<session_id>/history', methods=['GET'])
@require_auth
def get_history(session_id: str):
"""
Get conversation history for a session.
Query Parameters:
limit: Number of entries to return (default 20)
offset: Number of entries to skip (default 0)
Returns:
200: Paginated conversation history
401: Not authenticated
404: Session not found
500: Internal server error
"""
logger.info("Getting conversation history", session_id=session_id)
try:
# Get current user
user = get_current_user()
user_id = user.id
# Get pagination params
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
# Clamp values
limit = max(1, min(limit, 100)) # 1-100
offset = max(0, offset)
# Verify session ownership
session_service = get_session_service()
session = session_service.get_session(session_id, user_id)
# Get total history
total_history = session.conversation_history
total_turns = len(total_history)
# Apply pagination (from beginning)
paginated_history = total_history[offset:offset + limit]
# Format history entries
history_data = []
for entry in paginated_history:
# Handle timestamp - could be datetime object or already a string
timestamp = None
if hasattr(entry, 'timestamp') and entry.timestamp:
if isinstance(entry.timestamp, str):
timestamp = entry.timestamp
else:
timestamp = entry.timestamp.isoformat()
history_data.append({
"turn": entry.turn,
"action": entry.action,
"dm_response": entry.dm_response,
"timestamp": timestamp
})
logger.debug("Conversation history retrieved",
session_id=session_id,
total=total_turns,
returned=len(history_data))
return success_response({
"total_turns": total_turns,
"history": history_data,
"pagination": {
"limit": limit,
"offset": offset,
"has_more": (offset + limit) < total_turns
}
})
except SessionNotFound as e:
logger.warning("Session not found for history",
session_id=session_id,
error=str(e))
return not_found_response("Session not found")
except Exception as e:
logger.error("Failed to get conversation history",
session_id=session_id,
error=str(e),
exc_info=True)
return error_response(
status=500,
code="HISTORY_ERROR",
message="Failed to get conversation history"
)
@sessions_bp.route('/api/v1/sessions/<session_id>', methods=['DELETE'])
@require_auth
def delete_session(session_id: str):
"""
Permanently delete a game session.
This removes the session from the database entirely. The session cannot be
recovered after deletion. Use this to free up session slots for users who
have reached their tier limit.
Returns:
200: Session deleted successfully
401: Not authenticated
404: Session not found or not owned by user
500: Internal server error
"""
logger.info("Deleting session", session_id=session_id)
try:
# Get current user
user = get_current_user()
user_id = user.id
# Delete session (validates ownership internally)
session_service = get_session_service()
session_service.delete_session(session_id, user_id)
logger.info("Session deleted successfully",
session_id=session_id,
user_id=user_id)
return success_response({
"message": "Session deleted successfully",
"session_id": session_id
})
except SessionNotFound as e:
logger.warning("Session not found for deletion",
session_id=session_id,
error=str(e))
return not_found_response("Session not found")
except Exception as e:
logger.error("Failed to delete session",
session_id=session_id,
error=str(e),
exc_info=True)
return error_response(
status=500,
code="SESSION_DELETE_ERROR",
message="Failed to delete session"
)
@sessions_bp.route('/api/v1/usage', methods=['GET'])
@require_auth
def get_usage():
"""
Get user's daily usage information.
Returns the current daily turn usage, limit, remaining turns,
and reset time. Limits are based on user's subscription tier.
Returns:
200: Usage information
{
"user_id": "user_123",
"user_tier": "free",
"current_usage": 15,
"daily_limit": 50,
"remaining": 35,
"reset_time": "2025-11-27T00:00:00+00:00",
"is_limited": false,
"is_unlimited": false
}
401: Not authenticated
500: Internal server error
"""
logger.info("Getting usage info")
try:
# Get current user and tier
user = get_current_user()
user_id = user.id
user_tier = get_user_tier_from_user(user)
# Get usage info from rate limiter
rate_limiter = RateLimiterService()
usage_info = rate_limiter.get_usage_info(user_id, user_tier)
logger.debug("Usage info retrieved",
user_id=user_id,
current_usage=usage_info.get('current_usage'),
remaining=usage_info.get('remaining'))
return success_response(usage_info)
except Exception as e:
logger.error("Failed to get usage info",
error=str(e),
exc_info=True)
return error_response(
status=500,
code="USAGE_ERROR",
message="Failed to get usage information"
)