506 lines
15 KiB
Python
506 lines
15 KiB
Python
"""
|
|
Redis Service Wrapper
|
|
|
|
This module provides a wrapper around the redis-py client for handling caching,
|
|
job queue data, and temporary storage. It provides connection pooling, automatic
|
|
reconnection, and a clean interface for common Redis operations.
|
|
|
|
Usage:
|
|
from app.services.redis_service import RedisService
|
|
|
|
# Initialize service
|
|
redis = RedisService()
|
|
|
|
# Basic operations
|
|
redis.set("key", "value", ttl=3600) # Set with 1 hour TTL
|
|
value = redis.get("key")
|
|
redis.delete("key")
|
|
|
|
# Health check
|
|
if redis.health_check():
|
|
print("Redis is healthy")
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from typing import Optional, Any, Union
|
|
|
|
import redis
|
|
from redis.exceptions import RedisError, ConnectionError as RedisConnectionError
|
|
|
|
from app.utils.logging import get_logger
|
|
|
|
|
|
# Initialize logger
|
|
logger = get_logger(__file__)
|
|
|
|
|
|
class RedisServiceError(Exception):
|
|
"""Base exception for Redis service errors."""
|
|
pass
|
|
|
|
|
|
class RedisConnectionFailed(RedisServiceError):
|
|
"""Raised when Redis connection cannot be established."""
|
|
pass
|
|
|
|
|
|
class RedisService:
|
|
"""
|
|
Service class for interacting with Redis.
|
|
|
|
This class provides:
|
|
- Connection pooling for efficient connection management
|
|
- Basic operations: get, set, delete, exists
|
|
- TTL support for caching
|
|
- Health check for monitoring
|
|
- Automatic JSON serialization for complex objects
|
|
|
|
Attributes:
|
|
pool: Redis connection pool
|
|
client: Redis client instance
|
|
"""
|
|
|
|
def __init__(self, redis_url: Optional[str] = None):
|
|
"""
|
|
Initialize the Redis service.
|
|
|
|
Reads configuration from environment variables if not provided:
|
|
- REDIS_URL: Full Redis URL (e.g., redis://localhost:6379/0)
|
|
|
|
Args:
|
|
redis_url: Optional Redis URL to override environment variable
|
|
|
|
Raises:
|
|
RedisConnectionFailed: If connection to Redis fails
|
|
"""
|
|
self.redis_url = redis_url or os.getenv('REDIS_URL', 'redis://localhost:6379/0')
|
|
|
|
if not self.redis_url:
|
|
logger.error("Missing Redis URL configuration")
|
|
raise ValueError("Redis URL not configured. Set REDIS_URL environment variable.")
|
|
|
|
try:
|
|
# Create connection pool for efficient connection management
|
|
# Connection pooling allows multiple operations to share connections
|
|
# and automatically manages connection lifecycle
|
|
self.pool = redis.ConnectionPool.from_url(
|
|
self.redis_url,
|
|
max_connections=10,
|
|
decode_responses=True, # Return strings instead of bytes
|
|
socket_connect_timeout=5, # Connection timeout in seconds
|
|
socket_timeout=5, # Operation timeout in seconds
|
|
retry_on_timeout=True, # Retry on timeout
|
|
)
|
|
|
|
# Create client using the connection pool
|
|
self.client = redis.Redis(connection_pool=self.pool)
|
|
|
|
# Test connection
|
|
self.client.ping()
|
|
|
|
logger.info("Redis service initialized", redis_url=self._sanitize_url(self.redis_url))
|
|
|
|
except RedisConnectionError as e:
|
|
logger.error("Failed to connect to Redis", redis_url=self._sanitize_url(self.redis_url), error=str(e))
|
|
raise RedisConnectionFailed(f"Could not connect to Redis at {self._sanitize_url(self.redis_url)}: {e}")
|
|
except RedisError as e:
|
|
logger.error("Redis initialization error", error=str(e))
|
|
raise RedisServiceError(f"Redis initialization failed: {e}")
|
|
|
|
def get(self, key: str) -> Optional[str]:
|
|
"""
|
|
Get a value from Redis by key.
|
|
|
|
Args:
|
|
key: The key to retrieve
|
|
|
|
Returns:
|
|
The value as string if found, None if key doesn't exist
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
value = self.client.get(key)
|
|
|
|
if value is not None:
|
|
logger.debug("Redis GET", key=key, found=True)
|
|
else:
|
|
logger.debug("Redis GET", key=key, found=False)
|
|
|
|
return value
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis GET failed", key=key, error=str(e))
|
|
raise RedisServiceError(f"Failed to get key '{key}': {e}")
|
|
|
|
def get_json(self, key: str) -> Optional[Any]:
|
|
"""
|
|
Get a value from Redis and deserialize it from JSON.
|
|
|
|
Args:
|
|
key: The key to retrieve
|
|
|
|
Returns:
|
|
The deserialized value if found, None if key doesn't exist
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails or JSON is invalid
|
|
"""
|
|
value = self.get(key)
|
|
|
|
if value is None:
|
|
return None
|
|
|
|
try:
|
|
return json.loads(value)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to decode JSON from Redis", key=key, error=str(e))
|
|
raise RedisServiceError(f"Failed to decode JSON for key '{key}': {e}")
|
|
|
|
def set(
|
|
self,
|
|
key: str,
|
|
value: str,
|
|
ttl: Optional[int] = None,
|
|
nx: bool = False,
|
|
xx: bool = False
|
|
) -> bool:
|
|
"""
|
|
Set a value in Redis.
|
|
|
|
Args:
|
|
key: The key to set
|
|
value: The value to store (must be string)
|
|
ttl: Time to live in seconds (None for no expiration)
|
|
nx: Only set if key does not exist (for locking)
|
|
xx: Only set if key already exists
|
|
|
|
Returns:
|
|
True if the key was set, False if not set (due to nx/xx conditions)
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
result = self.client.set(
|
|
key,
|
|
value,
|
|
ex=ttl, # Expiration in seconds
|
|
nx=nx, # Only set if not exists
|
|
xx=xx # Only set if exists
|
|
)
|
|
|
|
# set() returns True if set, None if not set due to nx/xx
|
|
success = result is True or result == 1
|
|
|
|
logger.debug("Redis SET", key=key, ttl=ttl, nx=nx, xx=xx, success=success)
|
|
|
|
return success
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis SET failed", key=key, error=str(e))
|
|
raise RedisServiceError(f"Failed to set key '{key}': {e}")
|
|
|
|
def set_json(
|
|
self,
|
|
key: str,
|
|
value: Any,
|
|
ttl: Optional[int] = None,
|
|
nx: bool = False,
|
|
xx: bool = False
|
|
) -> bool:
|
|
"""
|
|
Serialize a value to JSON and store it in Redis.
|
|
|
|
Args:
|
|
key: The key to set
|
|
value: The value to serialize and store (must be JSON-serializable)
|
|
ttl: Time to live in seconds (None for no expiration)
|
|
nx: Only set if key does not exist
|
|
xx: Only set if key already exists
|
|
|
|
Returns:
|
|
True if the key was set, False if not set (due to nx/xx conditions)
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails or value is not JSON-serializable
|
|
"""
|
|
try:
|
|
json_value = json.dumps(value)
|
|
except (TypeError, ValueError) as e:
|
|
logger.error("Failed to serialize value to JSON", key=key, error=str(e))
|
|
raise RedisServiceError(f"Failed to serialize value for key '{key}': {e}")
|
|
|
|
return self.set(key, json_value, ttl=ttl, nx=nx, xx=xx)
|
|
|
|
def delete(self, *keys: str) -> int:
|
|
"""
|
|
Delete one or more keys from Redis.
|
|
|
|
Args:
|
|
*keys: One or more keys to delete
|
|
|
|
Returns:
|
|
The number of keys that were deleted
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
if not keys:
|
|
return 0
|
|
|
|
try:
|
|
deleted_count = self.client.delete(*keys)
|
|
|
|
logger.debug("Redis DELETE", keys=keys, deleted_count=deleted_count)
|
|
|
|
return deleted_count
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis DELETE failed", keys=keys, error=str(e))
|
|
raise RedisServiceError(f"Failed to delete keys {keys}: {e}")
|
|
|
|
def exists(self, *keys: str) -> int:
|
|
"""
|
|
Check if one or more keys exist in Redis.
|
|
|
|
Args:
|
|
*keys: One or more keys to check
|
|
|
|
Returns:
|
|
The number of keys that exist
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
if not keys:
|
|
return 0
|
|
|
|
try:
|
|
exists_count = self.client.exists(*keys)
|
|
|
|
logger.debug("Redis EXISTS", keys=keys, exists_count=exists_count)
|
|
|
|
return exists_count
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis EXISTS failed", keys=keys, error=str(e))
|
|
raise RedisServiceError(f"Failed to check existence of keys {keys}: {e}")
|
|
|
|
def expire(self, key: str, ttl: int) -> bool:
|
|
"""
|
|
Set a TTL (time to live) on an existing key.
|
|
|
|
Args:
|
|
key: The key to set expiration on
|
|
ttl: Time to live in seconds
|
|
|
|
Returns:
|
|
True if the timeout was set, False if key doesn't exist
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
result = self.client.expire(key, ttl)
|
|
|
|
logger.debug("Redis EXPIRE", key=key, ttl=ttl, success=result)
|
|
|
|
return result
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis EXPIRE failed", key=key, ttl=ttl, error=str(e))
|
|
raise RedisServiceError(f"Failed to set expiration for key '{key}': {e}")
|
|
|
|
def ttl(self, key: str) -> int:
|
|
"""
|
|
Get the remaining TTL (time to live) for a key.
|
|
|
|
Args:
|
|
key: The key to check
|
|
|
|
Returns:
|
|
TTL in seconds, -1 if key exists but has no expiry, -2 if key doesn't exist
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
remaining = self.client.ttl(key)
|
|
|
|
logger.debug("Redis TTL", key=key, remaining=remaining)
|
|
|
|
return remaining
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis TTL failed", key=key, error=str(e))
|
|
raise RedisServiceError(f"Failed to get TTL for key '{key}': {e}")
|
|
|
|
def incr(self, key: str, amount: int = 1) -> int:
|
|
"""
|
|
Increment a key's value by the given amount.
|
|
|
|
If the key doesn't exist, it will be created with the increment value.
|
|
|
|
Args:
|
|
key: The key to increment
|
|
amount: Amount to increment by (default 1)
|
|
|
|
Returns:
|
|
The new value after incrementing
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails or value is not an integer
|
|
"""
|
|
try:
|
|
new_value = self.client.incrby(key, amount)
|
|
|
|
logger.debug("Redis INCR", key=key, amount=amount, new_value=new_value)
|
|
|
|
return new_value
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis INCR failed", key=key, amount=amount, error=str(e))
|
|
raise RedisServiceError(f"Failed to increment key '{key}': {e}")
|
|
|
|
def decr(self, key: str, amount: int = 1) -> int:
|
|
"""
|
|
Decrement a key's value by the given amount.
|
|
|
|
If the key doesn't exist, it will be created with the negative increment value.
|
|
|
|
Args:
|
|
key: The key to decrement
|
|
amount: Amount to decrement by (default 1)
|
|
|
|
Returns:
|
|
The new value after decrementing
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails or value is not an integer
|
|
"""
|
|
try:
|
|
new_value = self.client.decrby(key, amount)
|
|
|
|
logger.debug("Redis DECR", key=key, amount=amount, new_value=new_value)
|
|
|
|
return new_value
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis DECR failed", key=key, amount=amount, error=str(e))
|
|
raise RedisServiceError(f"Failed to decrement key '{key}': {e}")
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Check if Redis connection is healthy.
|
|
|
|
This performs a PING command to verify the connection is working.
|
|
|
|
Returns:
|
|
True if Redis is healthy and responding, False otherwise
|
|
"""
|
|
try:
|
|
response = self.client.ping()
|
|
|
|
if response:
|
|
logger.debug("Redis health check passed")
|
|
return True
|
|
else:
|
|
logger.warning("Redis health check failed - unexpected response", response=response)
|
|
return False
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis health check failed", error=str(e))
|
|
return False
|
|
|
|
def info(self) -> dict:
|
|
"""
|
|
Get Redis server information.
|
|
|
|
Returns:
|
|
Dictionary containing server info (version, memory, clients, etc.)
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
info = self.client.info()
|
|
|
|
logger.debug("Redis INFO retrieved", redis_version=info.get('redis_version'))
|
|
|
|
return info
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis INFO failed", error=str(e))
|
|
raise RedisServiceError(f"Failed to get Redis info: {e}")
|
|
|
|
def flush_db(self) -> bool:
|
|
"""
|
|
Delete all keys in the current database.
|
|
|
|
WARNING: This is a destructive operation. Use with caution.
|
|
|
|
Returns:
|
|
True if successful
|
|
|
|
Raises:
|
|
RedisServiceError: If the operation fails
|
|
"""
|
|
try:
|
|
self.client.flushdb()
|
|
|
|
logger.warning("Redis database flushed")
|
|
|
|
return True
|
|
|
|
except RedisError as e:
|
|
logger.error("Redis FLUSHDB failed", error=str(e))
|
|
raise RedisServiceError(f"Failed to flush database: {e}")
|
|
|
|
def close(self) -> None:
|
|
"""
|
|
Close all connections in the pool.
|
|
|
|
Call this when shutting down the application to cleanly release connections.
|
|
"""
|
|
try:
|
|
self.pool.disconnect()
|
|
logger.info("Redis connection pool closed")
|
|
except Exception as e:
|
|
logger.error("Error closing Redis connection pool", error=str(e))
|
|
|
|
def _sanitize_url(self, url: str) -> str:
|
|
"""
|
|
Remove password from Redis URL for safe logging.
|
|
|
|
Args:
|
|
url: Redis URL that may contain password
|
|
|
|
Returns:
|
|
URL with password masked
|
|
"""
|
|
# Simple sanitization - mask password if present
|
|
# Format: redis://user:password@host:port/db
|
|
if '@' in url:
|
|
# Split on @ and mask everything before it except the protocol
|
|
parts = url.split('@')
|
|
protocol_and_creds = parts[0]
|
|
host_and_rest = parts[1]
|
|
|
|
if '://' in protocol_and_creds:
|
|
protocol = protocol_and_creds.split('://')[0]
|
|
return f"{protocol}://***@{host_and_rest}"
|
|
|
|
return url
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit - close connections."""
|
|
self.close()
|
|
return False
|