wrongmove/crawler/tests/unit/test_redis_lock.py

74 lines
2.9 KiB
Python

"""Unit tests for Redis distributed lock."""
from unittest import mock
import pytest
from utils.redis_lock import redis_lock, get_redis_client
class TestRedisLock:
"""Tests for redis_lock context manager."""
@mock.patch("utils.redis_lock.get_redis_client")
def test_lock_acquired_successfully(self, mock_get_client):
"""Test lock acquisition when no other lock exists."""
mock_client = mock.MagicMock()
mock_client.set.return_value = True
mock_get_client.return_value = mock_client
with redis_lock("test_lock") as acquired:
assert acquired is True
mock_client.set.assert_called_once_with("lock:test_lock", "1", nx=True, ex=3600 * 4)
mock_client.delete.assert_called_once_with("lock:test_lock")
@mock.patch("utils.redis_lock.get_redis_client")
def test_lock_not_acquired(self, mock_get_client):
"""Test lock not acquired when another lock exists."""
mock_client = mock.MagicMock()
mock_client.set.return_value = None # Redis returns None when nx=True fails
mock_get_client.return_value = mock_client
with redis_lock("test_lock") as acquired:
assert acquired is False
mock_client.set.assert_called_once_with("lock:test_lock", "1", nx=True, ex=3600 * 4)
# Should NOT call delete since we didn't acquire the lock
mock_client.delete.assert_not_called()
@mock.patch("utils.redis_lock.get_redis_client")
def test_lock_released_on_exception(self, mock_get_client):
"""Test lock is released even when exception occurs."""
mock_client = mock.MagicMock()
mock_client.set.return_value = True
mock_get_client.return_value = mock_client
with pytest.raises(ValueError):
with redis_lock("test_lock") as acquired:
assert acquired is True
raise ValueError("Test error")
# Lock should still be released
mock_client.delete.assert_called_once_with("lock:test_lock")
@mock.patch("utils.redis_lock.get_redis_client")
def test_custom_timeout(self, mock_get_client):
"""Test lock with custom timeout."""
mock_client = mock.MagicMock()
mock_client.set.return_value = True
mock_get_client.return_value = mock_client
with redis_lock("test_lock", timeout=300) as acquired:
assert acquired is True
mock_client.set.assert_called_once_with("lock:test_lock", "1", nx=True, ex=300)
@mock.patch("utils.redis_lock.redis")
def test_get_redis_client_uses_broker_url(self, mock_redis):
"""Test Redis client is created from CELERY_BROKER_URL."""
with mock.patch.dict("os.environ", {"CELERY_BROKER_URL": "redis://testhost:1234/5"}):
get_redis_client()
mock_redis.from_url.assert_called_once_with(
"redis://testhost:1234/5", decode_responses=True
)