""" RQ Task Queue Configuration This module defines the job queues used for background task processing. All async operations (AI generation, combat processing, marketplace tasks) are processed through these queues. Queue Types: - ai_tasks: AI narrative generation (highest priority) - combat_tasks: Combat processing - marketplace_tasks: Auction cleanup and periodic tasks (lowest priority) Usage: from app.tasks import get_queue, QUEUE_AI_TASKS queue = get_queue(QUEUE_AI_TASKS) job = queue.enqueue(my_function, arg1, arg2) """ import os from typing import Optional from redis import Redis from rq import Queue from app.utils.logging import get_logger # Initialize logger logger = get_logger(__file__) # Queue names QUEUE_AI_TASKS = 'ai_tasks' QUEUE_COMBAT_TASKS = 'combat_tasks' QUEUE_MARKETPLACE_TASKS = 'marketplace_tasks' # All queue names in priority order (highest first) ALL_QUEUES = [ QUEUE_AI_TASKS, QUEUE_COMBAT_TASKS, QUEUE_MARKETPLACE_TASKS, ] # Queue configurations QUEUE_CONFIG = { QUEUE_AI_TASKS: { 'default_timeout': 120, # 2 minutes for AI generation 'default_result_ttl': 3600, # Keep results for 1 hour 'default_failure_ttl': 86400, # Keep failures for 24 hours 'description': 'AI narrative generation tasks', }, QUEUE_COMBAT_TASKS: { 'default_timeout': 60, # 1 minute for combat 'default_result_ttl': 3600, 'default_failure_ttl': 86400, 'description': 'Combat processing tasks', }, QUEUE_MARKETPLACE_TASKS: { 'default_timeout': 300, # 5 minutes for marketplace 'default_result_ttl': 3600, 'default_failure_ttl': 86400, 'description': 'Marketplace and auction tasks', }, } # Redis connection singleton _redis_connection: Optional[Redis] = None def get_redis_connection() -> Redis: """ Get the Redis connection for RQ. Uses a singleton pattern to reuse the connection. Returns: Redis connection instance """ global _redis_connection if _redis_connection is None: redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0') _redis_connection = Redis.from_url(redis_url) logger.info("RQ Redis connection established", redis_url=redis_url.split('@')[-1]) return _redis_connection def get_queue(queue_name: str) -> Queue: """ Get an RQ queue by name. Args: queue_name: Name of the queue (use constants like QUEUE_AI_TASKS) Returns: RQ Queue instance Raises: ValueError: If queue name is not recognized """ if queue_name not in QUEUE_CONFIG: raise ValueError(f"Unknown queue: {queue_name}. Must be one of {list(QUEUE_CONFIG.keys())}") config = QUEUE_CONFIG[queue_name] conn = get_redis_connection() return Queue( name=queue_name, connection=conn, default_timeout=config['default_timeout'], ) def get_all_queues() -> list[Queue]: """ Get all configured queues in priority order. Returns: List of Queue instances (highest priority first) """ return [get_queue(name) for name in ALL_QUEUES] def get_queue_info(queue_name: str) -> dict: """ Get information about a queue. Args: queue_name: Name of the queue Returns: Dictionary with queue statistics """ queue = get_queue(queue_name) config = QUEUE_CONFIG[queue_name] return { 'name': queue_name, 'description': config['description'], 'count': len(queue), 'default_timeout': config['default_timeout'], 'default_result_ttl': config['default_result_ttl'], } def get_all_queues_info() -> list[dict]: """ Get information about all queues. Returns: List of queue info dictionaries """ return [get_queue_info(name) for name in ALL_QUEUES]