Files
Code_of_Conquest/api/scripts/queue_info.py
2025-11-24 23:10:55 -06:00

147 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RQ Queue Monitoring Utility
Displays information about RQ queues and their jobs.
Usage:
python scripts/queue_info.py # Show all queues
python scripts/queue_info.py --failed # Show failed jobs
python scripts/queue_info.py --workers # Show active workers
"""
import argparse
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from redis import Redis
from rq import Queue, Worker
from rq.job import Job
from rq.registry import FailedJobRegistry, StartedJobRegistry
from app.tasks import ALL_QUEUES, get_redis_connection, get_all_queues_info
def show_queue_info():
"""Display information about all queues."""
print("\n" + "=" * 60)
print("RQ Queue Status")
print("=" * 60)
for info in get_all_queues_info():
print(f"\nQueue: {info['name']}")
print(f" Description: {info['description']}")
print(f" Jobs in queue: {info['count']}")
print(f" Default timeout: {info['default_timeout']}s")
print(f" Result TTL: {info['default_result_ttl']}s")
def show_failed_jobs():
"""Display failed jobs from all queues."""
print("\n" + "=" * 60)
print("Failed Jobs")
print("=" * 60)
conn = get_redis_connection()
for queue_name in ALL_QUEUES:
queue = Queue(queue_name, connection=conn)
registry = FailedJobRegistry(queue=queue)
job_ids = registry.get_job_ids()
if job_ids:
print(f"\nQueue: {queue_name} ({len(job_ids)} failed)")
for job_id in job_ids[:10]: # Show first 10
job = Job.fetch(job_id, connection=conn)
print(f" - {job_id}")
print(f" Function: {job.func_name}")
print(f" Failed at: {job.ended_at}")
if job.exc_info:
# Show first line of exception
exc_line = job.exc_info.split('\n')[-2] if job.exc_info else 'Unknown'
print(f" Error: {exc_line[:80]}")
else:
print(f"\nQueue: {queue_name} (no failed jobs)")
def show_workers():
"""Display active workers."""
print("\n" + "=" * 60)
print("Active Workers")
print("=" * 60)
conn = get_redis_connection()
workers = Worker.all(connection=conn)
if not workers:
print("\nNo active workers found.")
return
for worker in workers:
print(f"\nWorker: {worker.name}")
print(f" State: {worker.get_state()}")
print(f" Queues: {', '.join(q.name for q in worker.queues)}")
print(f" PID: {worker.pid}")
current_job = worker.get_current_job()
if current_job:
print(f" Current job: {current_job.id}")
print(f" Function: {current_job.func_name}")
def show_started_jobs():
"""Display currently running jobs."""
print("\n" + "=" * 60)
print("Running Jobs")
print("=" * 60)
conn = get_redis_connection()
for queue_name in ALL_QUEUES:
queue = Queue(queue_name, connection=conn)
registry = StartedJobRegistry(queue=queue)
job_ids = registry.get_job_ids()
if job_ids:
print(f"\nQueue: {queue_name} ({len(job_ids)} running)")
for job_id in job_ids:
job = Job.fetch(job_id, connection=conn)
print(f" - {job_id}")
print(f" Function: {job.func_name}")
print(f" Started at: {job.started_at}")
else:
print(f"\nQueue: {queue_name} (no running jobs)")
def main():
parser = argparse.ArgumentParser(description='RQ Queue Monitoring Utility')
parser.add_argument('--failed', action='store_true', help='Show failed jobs')
parser.add_argument('--workers', action='store_true', help='Show active workers')
parser.add_argument('--running', action='store_true', help='Show running jobs')
parser.add_argument('--all', action='store_true', help='Show all information')
args = parser.parse_args()
# Always show queue info
show_queue_info()
if args.all or args.workers:
show_workers()
if args.all or args.running:
show_started_jobs()
if args.all or args.failed:
show_failed_jobs()
print("\n" + "=" * 60)
print("Done")
print("=" * 60 + "\n")
if __name__ == '__main__':
main()