50 lines
1.4 KiB
Python
50 lines
1.4 KiB
Python
"""Redis-based distributed locking for task coordination."""
|
|
import logging
|
|
import os
|
|
from contextlib import contextmanager
|
|
from typing import Generator
|
|
|
|
import redis
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
|
|
def get_redis_client() -> redis.Redis:
|
|
"""Get Redis client from Celery broker URL."""
|
|
broker_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
|
return redis.from_url(broker_url, decode_responses=True)
|
|
|
|
|
|
@contextmanager
|
|
def redis_lock(
|
|
lock_name: str, timeout: int = 3600 * 4
|
|
) -> Generator[bool, None, None]:
|
|
"""Distributed lock using Redis.
|
|
|
|
Args:
|
|
lock_name: Unique name for the lock
|
|
timeout: Lock expiration time in seconds (default: 4 hours)
|
|
|
|
Yields:
|
|
bool: True if lock was acquired, False otherwise
|
|
|
|
Example:
|
|
with redis_lock("scrape_listings") as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape is already running")
|
|
return
|
|
# ... do work ...
|
|
"""
|
|
client = get_redis_client()
|
|
lock_key = f"lock:{lock_name}"
|
|
|
|
# Try to acquire the lock
|
|
acquired = client.set(lock_key, "1", nx=True, ex=timeout)
|
|
|
|
try:
|
|
yield bool(acquired)
|
|
finally:
|
|
# Release the lock only if we acquired it
|
|
if acquired:
|
|
client.delete(lock_key)
|
|
logger.info(f"Released lock: {lock_name}")
|