"""Redis-based distributed locking for task coordination.""" import logging import os from contextlib import contextmanager from typing import Generator import redis logger = logging.getLogger("uvicorn.error") def get_redis_client() -> redis.Redis: """Get Redis client from Celery broker URL.""" broker_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") return redis.from_url(broker_url, decode_responses=True) @contextmanager def redis_lock( lock_name: str, timeout: int = 3600 * 4 ) -> Generator[bool, None, None]: """Distributed lock using Redis. Args: lock_name: Unique name for the lock timeout: Lock expiration time in seconds (default: 4 hours) Yields: bool: True if lock was acquired, False otherwise Example: with redis_lock("scrape_listings") as acquired: if not acquired: logger.warning("Another scrape is already running") return # ... do work ... """ client = get_redis_client() lock_key = f"lock:{lock_name}" # Try to acquire the lock acquired = client.set(lock_key, "1", nx=True, ex=timeout) try: yield bool(acquired) finally: # Release the lock only if we acquired it if acquired: client.delete(lock_key) logger.info(f"Released lock: {lock_name}")