Files
Code_of_Conquest/api/app/tasks/ai_tasks.py
2025-11-24 23:10:55 -06:00

1315 lines
41 KiB
Python

"""
AI Task Jobs for Background Processing
This module provides the base infrastructure for AI-related background jobs.
All AI generation tasks (narrative, combat, quests) are processed through
these job structures.
Usage:
from app.tasks.ai_tasks import enqueue_ai_task, get_job_status, get_job_result
# Enqueue a task
result = enqueue_ai_task(
task_type="narrative",
user_id="user_123",
context={"action": "explore"},
priority="high"
)
# Returns: {"job_id": "abc-123", "status": "queued"}
# Check status
status = get_job_status(result["job_id"])
# Returns: {"job_id": "abc-123", "status": "completed", ...}
# Get result
result = get_job_result(result["job_id"])
"""
import json
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from dataclasses import dataclass, asdict
from rq import Retry
from rq.job import Job
from app.tasks import get_queue, get_redis_connection, QUEUE_AI_TASKS
from app.services.redis_service import RedisService
from app.utils.logging import get_logger
# Imports for AI generation
from app.ai.narrative_generator import NarrativeGenerator, NarrativeGeneratorError
from app.ai.model_selector import UserTier
# Import for usage tracking
from app.services.usage_tracking_service import UsageTrackingService
from app.models.ai_usage import TaskType as UsageTaskType
# Import for response parsing and item validation
from app.ai.response_parser import parse_ai_response, ParsedAIResponse, GameStateChanges
from app.services.item_validator import get_item_validator, ItemValidationError
from app.services.character_service import get_character_service
# Import for template rendering
from app.ai.prompt_templates import get_prompt_templates
# Initialize logger
logger = get_logger(__file__)
class JobStatus(str, Enum):
"""Job status states."""
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class TaskType(str, Enum):
"""Types of AI tasks."""
NARRATIVE = "narrative"
COMBAT = "combat"
QUEST_SELECTION = "quest_selection"
NPC_DIALOGUE = "npc_dialogue"
class TaskPriority(str, Enum):
"""Task priority levels."""
LOW = "low"
NORMAL = "normal"
HIGH = "high"
@dataclass
class JobResult:
"""
Result of an AI task job.
Attributes:
job_id: Unique job identifier
status: Current job status
task_type: Type of AI task
user_id: User who requested the task
result: The actual result data (if completed)
error: Error message (if failed)
created_at: When the job was created
started_at: When processing started
completed_at: When the job finished
retries: Number of retry attempts
"""
job_id: str
status: JobStatus
task_type: str
user_id: str
result: Optional[dict] = None
error: Optional[str] = None
created_at: Optional[str] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
retries: int = 0
def to_dict(self) -> dict:
"""Convert to dictionary."""
data = asdict(self)
data['status'] = self.status.value if isinstance(self.status, JobStatus) else self.status
return data
# Redis key prefixes for job data
JOB_RESULT_PREFIX = "ai_job_result:"
JOB_STATUS_PREFIX = "ai_job_status:"
# TTL for job results (1 hour)
JOB_RESULT_TTL = 3600
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAYS = [60, 300, 900] # 1 min, 5 min, 15 min
def enqueue_ai_task(
task_type: str,
user_id: str,
context: dict,
priority: str = "normal",
session_id: Optional[str] = None,
character_id: Optional[str] = None,
) -> dict:
"""
Enqueue an AI task for background processing.
Args:
task_type: Type of task (narrative, combat, quest_selection, npc_dialogue)
user_id: User ID requesting the task
context: Task-specific context data
priority: Task priority (low, normal, high)
session_id: Optional game session ID
character_id: Optional character ID
Returns:
Dictionary with job_id and status
Raises:
ValueError: If task_type or priority is invalid
"""
# Validate task type
try:
task_type_enum = TaskType(task_type)
except ValueError:
raise ValueError(f"Invalid task_type: {task_type}. Must be one of {[t.value for t in TaskType]}")
# Validate priority
try:
priority_enum = TaskPriority(priority)
except ValueError:
raise ValueError(f"Invalid priority: {priority}. Must be one of {[p.value for p in TaskPriority]}")
# Generate job ID
job_id = f"ai_{task_type}_{uuid.uuid4().hex[:12]}"
# Get queue
queue = get_queue(QUEUE_AI_TASKS)
# Determine timeout based on task type
timeouts = {
TaskType.NARRATIVE: 120,
TaskType.COMBAT: 60,
TaskType.QUEST_SELECTION: 90,
TaskType.NPC_DIALOGUE: 60,
}
timeout = timeouts.get(task_type_enum, 120)
# Build job arguments
job_kwargs = {
'task_type': task_type,
'user_id': user_id,
'context': context,
'session_id': session_id,
'character_id': character_id,
'job_id': job_id,
}
# Configure retry
retry = Retry(max=MAX_RETRIES, interval=RETRY_DELAYS)
# Enqueue the job
# Priority is handled by queue position (high priority jobs go to front)
at_front = priority_enum == TaskPriority.HIGH
job = queue.enqueue(
process_ai_task,
kwargs=job_kwargs,
job_id=job_id,
job_timeout=timeout,
result_ttl=JOB_RESULT_TTL,
failure_ttl=86400, # Keep failures for 24 hours
retry=retry,
at_front=at_front,
)
# Store initial job status
_store_job_status(
job_id=job_id,
status=JobStatus.QUEUED,
task_type=task_type,
user_id=user_id,
)
logger.info(
"AI task enqueued",
job_id=job_id,
task_type=task_type,
user_id=user_id,
priority=priority,
at_front=at_front,
)
return {
"job_id": job_id,
"status": JobStatus.QUEUED.value,
}
def process_ai_task(
task_type: str,
user_id: str,
context: dict,
job_id: str,
session_id: Optional[str] = None,
character_id: Optional[str] = None,
) -> dict:
"""
Process an AI task.
This is the main job function that gets executed by RQ workers.
It dispatches to specific handlers based on task type.
Args:
task_type: Type of task
user_id: User ID
context: Task context
job_id: Job ID
session_id: Optional session ID
character_id: Optional character ID
Returns:
Result dictionary
"""
logger.info(
"Processing AI task",
job_id=job_id,
task_type=task_type,
user_id=user_id,
)
# Update status to processing
_update_job_status(job_id, JobStatus.PROCESSING)
try:
# Dispatch to appropriate handler based on task type
task_type_enum = TaskType(task_type)
if task_type_enum == TaskType.NARRATIVE:
result = _process_narrative_task(user_id, context, session_id, character_id)
elif task_type_enum == TaskType.COMBAT:
result = _process_combat_task(user_id, context, session_id, character_id)
elif task_type_enum == TaskType.QUEST_SELECTION:
result = _process_quest_selection_task(user_id, context, session_id, character_id)
elif task_type_enum == TaskType.NPC_DIALOGUE:
result = _process_npc_dialogue_task(user_id, context, session_id, character_id)
else:
raise ValueError(f"Unknown task type: {task_type}")
# Store successful result
_store_job_result(job_id, result)
_update_job_status(job_id, JobStatus.COMPLETED, result=result)
logger.info(
"AI task completed",
job_id=job_id,
task_type=task_type,
)
return result
except Exception as e:
error_msg = str(e)
logger.error(
"AI task failed",
job_id=job_id,
task_type=task_type,
error=error_msg,
exc_info=True,
)
# Update status to failed
_update_job_status(job_id, JobStatus.FAILED, error=error_msg)
# Re-raise for RQ retry handling
raise
def _process_narrative_task(
user_id: str,
context: dict,
session_id: Optional[str],
character_id: Optional[str],
) -> dict:
"""
Process a narrative generation task (DM response to player action).
Args:
user_id: User ID for tier lookup
context: Must contain:
- action: The player's action text
- character: Character data dict
- game_state: Game state dict
- conversation_history: Optional list of previous turns
- world_context: Optional additional world info
- dm_prompt_template: Action-specific AI instructions
session_id: Game session ID for updating
character_id: Character ID
Returns:
Dictionary with narrative, tokens_used, model, and metadata
"""
# Validate required context fields
required_fields = ['action', 'character', 'game_state']
for field in required_fields:
if field not in context:
raise ValueError(f"Missing required context field: {field}")
# Get user tier for model selection
user_tier = _get_user_tier(user_id)
# Initialize narrative generator
generator = NarrativeGenerator()
try:
# Pre-render dm_prompt_template if check_outcome is present
dm_prompt_template = context.get('dm_prompt_template')
check_outcome = context.get('check_outcome')
if dm_prompt_template and check_outcome:
# Render the dm_prompt_template as a Jinja2 template with check_outcome
try:
prompt_templates = get_prompt_templates()
action_instructions = prompt_templates.render_string(
dm_prompt_template,
character=context['character'],
game_state=context['game_state'],
check_outcome=check_outcome
)
logger.debug(
"Pre-rendered dm_prompt_template with check_outcome",
success=check_outcome.get('check_result', {}).get('success')
)
except Exception as e:
logger.warning(
"Failed to pre-render dm_prompt_template",
error=str(e)
)
action_instructions = dm_prompt_template
else:
action_instructions = dm_prompt_template
# Generate the narrative response
response = generator.generate_story_response(
character=context['character'],
action=context['action'],
game_state=context['game_state'],
user_tier=user_tier,
conversation_history=context.get('conversation_history'),
world_context=context.get('world_context'),
action_instructions=action_instructions
)
# Parse the AI response (extracts narrative text)
parsed_response = parse_ai_response(response.narrative)
# Process game state changes from dice check outcomes
items_added = []
items_failed = []
gold_changed = 0
# Process items from check_outcome (search results)
if check_outcome and check_outcome.get('check_result', {}).get('success'):
items_found = check_outcome.get('items_found', [])
gold_found = check_outcome.get('gold_found', 0)
if items_found:
# Add items from search to character inventory
for item_data in items_found:
try:
# Create item grant structure for the validator
item_grant = {
"name": item_data.get("name"),
"type": "consumable", # Default type for found items
"description": item_data.get("description", ""),
"value": item_data.get("value", 1)
}
# Use item validator to add to inventory
validator = get_item_validator()
validated_item = validator.validate_and_create_item(item_grant)
if validated_item:
character_service = get_character_service()
character_service.add_item_to_inventory(
character_id,
validated_item,
user_id
)
items_added.append(validated_item)
logger.info(
"Item from search added to inventory",
item_name=validated_item.name,
character_id=character_id
)
except Exception as e:
logger.warning(
"Failed to add search item",
item_name=item_data.get("name"),
error=str(e)
)
items_failed.append(item_data.get("name", "Unknown"))
if gold_found > 0:
# Add gold from search to character
try:
character_service = get_character_service()
character_service.add_gold(character_id, gold_found, user_id)
gold_changed += gold_found
logger.info(
"Gold from search added",
amount=gold_found,
character_id=character_id
)
except Exception as e:
logger.warning(
"Failed to add search gold",
amount=gold_found,
error=str(e)
)
# Note: Items/gold/XP now come exclusively from check_outcomes (predetermined dice rolls)
# AI no longer provides structured game actions
result = {
"dm_response": parsed_response.narrative,
"tokens_used": response.tokens_used,
"model": response.model,
"context_type": response.context_type,
"generation_time": response.generation_time,
"items_added": [item.name for item in items_added],
"items_failed": items_failed,
"gold_changed": gold_changed,
"check_outcome": check_outcome, # Dice roll info for UI animation
}
# Update game session if session_id provided
if session_id:
_update_game_session(
session_id=session_id,
action=context['action'],
dm_response=parsed_response.narrative,
character_id=character_id,
game_changes=parsed_response.game_changes,
items_added=items_added
)
# Log AI usage
_log_ai_usage(
user_id=user_id,
model=response.model,
tokens_input=response.tokens_input,
tokens_output=response.tokens_output,
task_type=UsageTaskType.STORY_PROGRESSION,
session_id=session_id,
character_id=character_id,
request_duration_ms=int(response.generation_time * 1000),
success=True
)
logger.info(
"Narrative task completed",
user_id=user_id,
tokens_used=response.tokens_used,
model=response.model
)
return result
except NarrativeGeneratorError as e:
logger.error("Narrative generation failed", user_id=user_id, error=str(e))
raise
def _process_combat_task(
user_id: str,
context: dict,
session_id: Optional[str],
character_id: Optional[str],
) -> dict:
"""
Process a combat narration task.
Args:
user_id: User ID for tier lookup
context: Must contain:
- character: Character data dict
- combat_state: Combat state with enemies, round, etc.
- action: Description of combat action
- action_result: Result dict with hit, damage, etc.
- is_critical: Optional bool for critical hit
- is_finishing_blow: Optional bool for killing blow
session_id: Game session ID
character_id: Character ID
Returns:
Dictionary with combat_narrative, tokens_used, model
"""
# Validate required context fields
required_fields = ['character', 'combat_state', 'action', 'action_result']
for field in required_fields:
if field not in context:
raise ValueError(f"Missing required context field: {field}")
# Get user tier
user_tier = _get_user_tier(user_id)
# Initialize generator
generator = NarrativeGenerator()
try:
response = generator.generate_combat_narration(
character=context['character'],
combat_state=context['combat_state'],
action=context['action'],
action_result=context['action_result'],
user_tier=user_tier,
is_critical=context.get('is_critical', False),
is_finishing_blow=context.get('is_finishing_blow', False)
)
result = {
"combat_narrative": response.narrative,
"tokens_used": response.tokens_used,
"model": response.model,
"context_type": response.context_type,
"generation_time": response.generation_time,
}
# Log AI usage
_log_ai_usage(
user_id=user_id,
model=response.model,
tokens_input=response.tokens_input,
tokens_output=response.tokens_output,
task_type=UsageTaskType.COMBAT_NARRATION,
session_id=session_id,
character_id=character_id,
request_duration_ms=int(response.generation_time * 1000),
success=True
)
logger.info(
"Combat narration completed",
user_id=user_id,
tokens_used=response.tokens_used
)
return result
except NarrativeGeneratorError as e:
logger.error("Combat narration failed", user_id=user_id, error=str(e))
raise
def _process_quest_selection_task(
user_id: str,
context: dict,
session_id: Optional[str],
character_id: Optional[str],
) -> dict:
"""
Process a quest selection task (AI selects contextually appropriate quest).
Args:
user_id: User ID for tier lookup
context: Must contain:
- character: Character data dict
- eligible_quests: List of quest dicts that can be offered
- game_context: Current game context (location, events, etc.)
- recent_actions: Optional list of recent player actions
session_id: Game session ID
character_id: Character ID
Returns:
Dictionary with selected_quest_id and metadata
"""
# Validate required context fields
required_fields = ['character', 'eligible_quests', 'game_context']
for field in required_fields:
if field not in context:
raise ValueError(f"Missing required context field: {field}")
# Get user tier
user_tier = _get_user_tier(user_id)
# Initialize generator
generator = NarrativeGenerator()
try:
selected_quest_id = generator.generate_quest_selection(
character=context['character'],
eligible_quests=context['eligible_quests'],
game_context=context['game_context'],
user_tier=user_tier,
recent_actions=context.get('recent_actions')
)
result = {
"selected_quest_id": selected_quest_id,
}
# Log AI usage (estimate tokens for quest selection - typically small)
# Quest selection uses less tokens than narrative generation
_log_ai_usage(
user_id=user_id,
model="anthropic/claude-3.5-haiku", # Quest selection uses fast model
tokens_input=150, # Estimate for quest selection prompt
tokens_output=50, # Estimate for quest_id response
task_type=UsageTaskType.QUEST_SELECTION,
session_id=session_id,
character_id=character_id,
request_duration_ms=0,
success=True
)
logger.info(
"Quest selection completed",
user_id=user_id,
selected_quest_id=selected_quest_id
)
return result
except NarrativeGeneratorError as e:
logger.error("Quest selection failed", user_id=user_id, error=str(e))
raise
def _process_npc_dialogue_task(
user_id: str,
context: dict,
session_id: Optional[str],
character_id: Optional[str],
) -> dict:
"""
Process an NPC dialogue task.
Args:
user_id: User ID for tier lookup
context: Must contain:
- character: Character data dict
- npc: NPC data with name, role, personality, etc.
- conversation_topic: What the player said
- game_state: Current game state
- npc_relationship: Optional relationship description
- previous_dialogue: Optional list of previous exchanges
- npc_knowledge: Optional list of things NPC knows
session_id: Game session ID
character_id: Character ID
Returns:
Dictionary with dialogue, tokens_used, model
"""
# Validate required context fields
required_fields = ['character', 'npc', 'conversation_topic', 'game_state']
for field in required_fields:
if field not in context:
raise ValueError(f"Missing required context field: {field}")
# Get user tier
user_tier = _get_user_tier(user_id)
# Initialize generator
generator = NarrativeGenerator()
try:
response = generator.generate_npc_dialogue(
character=context['character'],
npc=context['npc'],
conversation_topic=context['conversation_topic'],
game_state=context['game_state'],
user_tier=user_tier,
npc_relationship=context.get('npc_relationship'),
previous_dialogue=context.get('previous_dialogue'),
npc_knowledge=context.get('npc_knowledge')
)
# Get NPC info for result
npc_name = context['npc'].get('name', 'NPC')
npc_id = context.get('npc_full', {}).get('npc_id') or context['npc'].get('npc_id')
character_name = context['character'].get('name', 'You')
# Get previous dialogue for display (before adding new exchange)
previous_dialogue = context.get('previous_dialogue', [])
result = {
"dialogue": response.narrative,
"tokens_used": response.tokens_used,
"model": response.model,
"context_type": response.context_type,
"generation_time": response.generation_time,
"npc_name": npc_name,
"npc_id": npc_id,
"character_name": character_name,
"player_line": context['conversation_topic'],
"conversation_history": previous_dialogue, # History before this exchange
}
# Save dialogue exchange to character's conversation history
if character_id:
try:
if npc_id:
character_service = get_character_service()
character_service.add_npc_dialogue_exchange(
character_id=character_id,
user_id=user_id,
npc_id=npc_id,
player_line=context['conversation_topic'],
npc_response=response.narrative
)
logger.debug(
"NPC dialogue exchange saved",
character_id=character_id,
npc_id=npc_id
)
except Exception as e:
# Don't fail the task if history save fails
logger.warning(
"Failed to save NPC dialogue exchange",
character_id=character_id,
error=str(e)
)
# Log AI usage
_log_ai_usage(
user_id=user_id,
model=response.model,
tokens_input=response.tokens_input,
tokens_output=response.tokens_output,
task_type=UsageTaskType.NPC_DIALOGUE,
session_id=session_id,
character_id=character_id,
request_duration_ms=int(response.generation_time * 1000),
success=True
)
logger.info(
"NPC dialogue completed",
user_id=user_id,
npc_name=context['npc'].get('name'),
tokens_used=response.tokens_used
)
return result
except NarrativeGeneratorError as e:
logger.error("NPC dialogue failed", user_id=user_id, error=str(e))
raise
def _get_user_tier(user_id: str) -> UserTier:
"""
Get the user's subscription tier.
Args:
user_id: User ID to look up
Returns:
UserTier enum value
"""
try:
from app.services.appwrite_service import AppwriteService
appwrite = AppwriteService()
tier_string = appwrite.get_user_tier(user_id)
# Convert string to UserTier enum
tier_map = {
'free': UserTier.FREE,
'basic': UserTier.BASIC,
'premium': UserTier.PREMIUM,
'elite': UserTier.ELITE,
}
return tier_map.get(tier_string.lower(), UserTier.FREE)
except Exception as e:
logger.warning(
"Failed to get user tier, defaulting to FREE",
user_id=user_id,
error=str(e)
)
return UserTier.FREE
def _process_item_grants(
game_changes: GameStateChanges,
character_id: Optional[str],
user_id: str
) -> tuple[list, list[str]]:
"""
Process item grants from AI response, validating and resolving each item.
Args:
game_changes: GameStateChanges with items_given list
character_id: Character ID for database updates and validation
user_id: User ID for logging
Returns:
Tuple of (list of valid Item objects, list of error messages for failed items)
"""
from app.models.character import Character
from app.models.items import Item
from app.utils.database import get_database
validator = get_item_validator()
items_added: list[Item] = []
items_failed: list[str] = []
# Fetch character from DB for validation (ensures we have current state)
if not character_id:
logger.warning(
"No character_id provided for item validation",
user_id=user_id
)
return [], ["No character ID for item validation"]
try:
db = get_database()
char_doc = db.get_row('characters', character_id)
if not char_doc:
return [], [f"Character not found: {character_id}"]
char_json = char_doc.data.get('characterData', '{}')
import json
char_data = json.loads(char_json)
character = Character.from_dict(char_data)
except Exception as e:
logger.error(
"Failed to fetch character for item validation",
error=str(e),
character_id=character_id
)
return [], [f"Character fetch failed: {str(e)}"]
for item_grant in game_changes.items_given:
item, error = validator.validate_and_resolve_item(item_grant, character)
if item:
items_added.append(item)
logger.info(
"Item validated for grant",
item_name=item.name,
item_id=item.item_id,
character_id=character_id
)
else:
items_failed.append(error or "Unknown validation error")
logger.warning(
"Item grant failed validation",
item_name=item_grant.name or item_grant.item_id,
error=error,
character_id=character_id,
user_id=user_id
)
return items_added, items_failed
def _update_game_session(
session_id: str,
action: str,
dm_response: str,
character_id: Optional[str] = None,
game_changes: Optional[GameStateChanges] = None,
items_added: Optional[list] = None
) -> None:
"""
Update the game session with a new conversation entry.
This function updates the GameSession in Appwrite, applies game state
changes (items, gold) to the character, and triggers Realtime notifications
for connected clients.
Args:
session_id: Game session ID
action: Player's action text
dm_response: DM's narrative response
character_id: Optional character ID
game_changes: Optional game state changes from AI response
items_added: Optional list of validated Item objects to add to character
"""
try:
from app.services.database_service import DatabaseService
from datetime import datetime, timezone
import json
db = DatabaseService()
# Get current session document
session_doc = db.get_row('game_sessions', session_id)
if not session_doc:
logger.warning("Session not found for update", session_id=session_id)
return
# Parse the sessionData JSON string
session_json = session_doc.data.get('sessionData', '{}')
session_data = json.loads(session_json)
# Increment turn number
turn_number = session_data.get('turn_number', 0) + 1
# Get or initialize conversation history
conversation_history = session_data.get('conversation_history', [])
# Add new entry
new_entry = {
'turn': turn_number,
'character_id': character_id,
'action': action,
'dm_response': dm_response,
'timestamp': datetime.now(timezone.utc).isoformat()
}
conversation_history.append(new_entry)
# Update session data
session_data['turn_number'] = turn_number
session_data['conversation_history'] = conversation_history
session_data['last_activity'] = datetime.now(timezone.utc).isoformat()
# Serialize back to JSON and update only the sessionData field
updated_doc = {
'sessionData': json.dumps(session_data)
}
db.update_row('game_sessions', session_id, updated_doc)
logger.info(
"Game session updated",
session_id=session_id,
turn_number=turn_number
)
# Apply game state changes to character if we have a character_id
if character_id and (game_changes or items_added):
_apply_character_changes(
character_id=character_id,
game_changes=game_changes,
items_added=items_added or []
)
# Note: Appwrite Realtime will automatically notify subscribed clients
# when the document is updated. No additional trigger needed.
except Exception as e:
logger.error(
"Failed to update game session",
session_id=session_id,
error=str(e),
exc_info=True
)
# Don't raise - the AI generation succeeded, we don't want to fail the job
def _apply_character_changes(
character_id: str,
game_changes: Optional[GameStateChanges],
items_added: list
) -> None:
"""
Apply game state changes to a character in the database.
This function updates the character's inventory and gold based on
the AI's game actions.
Args:
character_id: Character ID to update
game_changes: Game state changes (for gold, experience)
items_added: List of validated Item objects to add
"""
try:
from app.services.database_service import DatabaseService
from app.models.character import Character
import json
db = DatabaseService()
# Get current character document
char_doc = db.get_row('characters', character_id)
if not char_doc:
logger.warning(
"Character not found for game state update",
character_id=character_id
)
return
# Parse character data from the nested JSON structure
# Database stores: {userId, characterData (JSON string), is_active}
char_json = char_doc.data.get('characterData', '{}')
char_data = json.loads(char_json)
character = Character.from_dict(char_data)
changes_made = False
# Add items to inventory
if items_added:
for item in items_added:
character.add_item(item)
logger.info(
"Added item to character inventory",
item_name=item.name,
item_id=item.item_id,
character_id=character_id
)
changes_made = True
# Apply gold changes
if game_changes:
if game_changes.gold_given > 0:
character.add_gold(game_changes.gold_given)
logger.info(
"Added gold to character",
gold_added=game_changes.gold_given,
character_id=character_id
)
changes_made = True
if game_changes.gold_taken > 0:
if character.remove_gold(game_changes.gold_taken):
logger.info(
"Removed gold from character",
gold_removed=game_changes.gold_taken,
character_id=character_id
)
else:
logger.warning(
"Character has insufficient gold",
gold_required=game_changes.gold_taken,
gold_available=character.gold,
character_id=character_id
)
changes_made = True
# Apply experience
if game_changes.experience_given > 0:
leveled_up = character.add_experience(game_changes.experience_given)
logger.info(
"Added experience to character",
experience_added=game_changes.experience_given,
leveled_up=leveled_up,
character_id=character_id
)
changes_made = True
# Save changes if any were made
if changes_made:
# Serialize back to the nested structure
updated_char_data = character.to_dict()
updated_doc = {
'characterData': json.dumps(updated_char_data)
}
db.update_row('characters', character_id, updated_doc)
logger.info(
"Character updated with game state changes",
character_id=character_id,
items_added=len(items_added),
gold_change=(
(game_changes.gold_given - game_changes.gold_taken)
if game_changes else 0
)
)
except Exception as e:
logger.error(
"Failed to apply character changes",
character_id=character_id,
error=str(e),
exc_info=True
)
# Don't raise - session update succeeded, character update is secondary
def get_job_status(job_id: str) -> dict:
"""
Get the current status of a job.
Args:
job_id: The job ID to check
Returns:
Dictionary with job status information
"""
redis = RedisService()
# Try to get from our status cache first
status_key = f"{JOB_STATUS_PREFIX}{job_id}"
cached_status = redis.get_json(status_key)
if cached_status:
return cached_status
# Fall back to RQ job status
conn = get_redis_connection()
try:
job = Job.fetch(job_id, connection=conn)
status = JobStatus.QUEUED
if job.is_finished:
status = JobStatus.COMPLETED
elif job.is_failed:
status = JobStatus.FAILED
elif job.is_started:
status = JobStatus.PROCESSING
return {
"job_id": job_id,
"status": status.value,
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"ended_at": job.ended_at.isoformat() if job.ended_at else None,
}
except Exception as e:
logger.warning("Failed to fetch job status", job_id=job_id, error=str(e))
return {
"job_id": job_id,
"status": "unknown",
"error": "Job not found",
}
def get_job_result(job_id: str) -> Optional[dict]:
"""
Get the result of a completed job.
Args:
job_id: The job ID
Returns:
Result dictionary if available, None otherwise
"""
redis = RedisService()
# Try to get from our result cache
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
cached_result = redis.get_json(result_key)
if cached_result:
return cached_result
# Fall back to RQ job result
conn = get_redis_connection()
try:
job = Job.fetch(job_id, connection=conn)
if job.is_finished and job.result:
return job.result
except Exception as e:
logger.warning("Failed to fetch job result", job_id=job_id, error=str(e))
return None
def _store_job_status(
job_id: str,
status: JobStatus,
task_type: str = "",
user_id: str = "",
result: Optional[dict] = None,
error: Optional[str] = None,
) -> None:
"""Store job status in Redis."""
redis = RedisService()
status_key = f"{JOB_STATUS_PREFIX}{job_id}"
status_data = {
"job_id": job_id,
"status": status.value,
"task_type": task_type,
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"started_at": None,
"completed_at": None,
"result": result,
"error": error,
}
redis.set_json(status_key, status_data, ttl=JOB_RESULT_TTL)
def _update_job_status(
job_id: str,
status: JobStatus,
result: Optional[dict] = None,
error: Optional[str] = None,
) -> None:
"""Update existing job status in Redis."""
redis = RedisService()
status_key = f"{JOB_STATUS_PREFIX}{job_id}"
# Get existing status
existing = redis.get_json(status_key) or {}
# Update fields
existing["status"] = status.value
now = datetime.now(timezone.utc).isoformat()
if status == JobStatus.PROCESSING:
existing["started_at"] = now
elif status in (JobStatus.COMPLETED, JobStatus.FAILED):
existing["completed_at"] = now
if result:
existing["result"] = result
if error:
existing["error"] = error
redis.set_json(status_key, existing, ttl=JOB_RESULT_TTL)
def _store_job_result(job_id: str, result: dict) -> None:
"""Store job result in Redis."""
redis = RedisService()
result_key = f"{JOB_RESULT_PREFIX}{job_id}"
redis.set_json(result_key, result, ttl=JOB_RESULT_TTL)
def _log_ai_usage(
user_id: str,
model: str,
tokens_input: int,
tokens_output: int,
task_type: UsageTaskType,
session_id: Optional[str] = None,
character_id: Optional[str] = None,
request_duration_ms: int = 0,
success: bool = True,
error_message: Optional[str] = None
) -> None:
"""
Log AI usage to the usage tracking service.
This function wraps the UsageTrackingService to safely log usage
without failing the job if logging fails.
Args:
user_id: User who made the request
model: Model identifier used
tokens_input: Number of input tokens (prompt)
tokens_output: Number of output tokens (response)
task_type: Type of AI task
session_id: Optional game session ID
character_id: Optional character ID
request_duration_ms: Request duration in milliseconds
success: Whether the request succeeded
error_message: Error message if failed
"""
try:
tracker = UsageTrackingService()
tracker.log_usage(
user_id=user_id,
model=model,
tokens_input=tokens_input,
tokens_output=tokens_output,
task_type=task_type,
session_id=session_id,
character_id=character_id,
request_duration_ms=request_duration_ms,
success=success,
error_message=error_message
)
logger.debug(
"AI usage logged successfully",
user_id=user_id,
model=model,
tokens_input=tokens_input,
tokens_output=tokens_output,
task_type=task_type.value
)
except Exception as e:
# Log the error but don't fail the job
logger.error(
"Failed to log AI usage (non-fatal)",
user_id=user_id,
error=str(e)
)
# Don't raise - usage logging failure shouldn't fail the AI job