diff --git a/shared/redis_streams.py b/shared/redis_streams.py index 718ed16..5e8095a 100644 --- a/shared/redis_streams.py +++ b/shared/redis_streams.py @@ -5,6 +5,7 @@ import logging from typing import AsyncIterator from redis.asyncio import Redis +from redis.exceptions import TimeoutError as RedisTimeoutError logger = logging.getLogger(__name__) @@ -57,13 +58,18 @@ class StreamConsumer: """ await self.ensure_group() while True: - messages = await self.redis.xreadgroup( - self.group, - self.consumer, - {self.stream: ">"}, - count=batch_size, - block=block_ms, - ) + try: + messages = await self.redis.xreadgroup( + self.group, + self.consumer, + {self.stream: ">"}, + count=batch_size, + block=block_ms, + ) + except RedisTimeoutError: + # redis-py raises this when a blocking read returns no data + # within block_ms (idle stream). Expected — keep polling. + continue for _stream_name, entries in messages: for msg_id, fields in entries: data = json.loads(fields[b"data"]) diff --git a/tests/test_redis_streams.py b/tests/test_redis_streams.py index 6363565..863f8ab 100644 --- a/tests/test_redis_streams.py +++ b/tests/test_redis_streams.py @@ -95,3 +95,41 @@ async def test_consumer_consume_yields_and_acks() -> None: assert len(results) == 1 assert results[0] == (b"1-0", payload) redis.xack.assert_called_once_with("test:stream", "grp", b"1-0") + + +@pytest.mark.asyncio +async def test_consume_survives_idle_blocking_timeout() -> None: + """A blocking-read TimeoutError on an idle stream must not kill the loop. + + redis-py raises ``redis.exceptions.TimeoutError`` when a blocking + ``XREADGROUP`` returns no data within ``block_ms`` (idle stream). This is + an expected idle condition, not a fatal error — ``consume`` must swallow it + and keep polling. + """ + from redis.exceptions import TimeoutError as RedisTimeoutError + + redis = AsyncMock() + redis.xgroup_create = AsyncMock() + + payload = {"ticker": "NVDA", "direction": "LONG"} + redis.xreadgroup = AsyncMock( + side_effect=[ + RedisTimeoutError("Timeout reading from redis"), # idle stream + [(b"test:stream", [(b"5-0", {b"data": json.dumps(payload).encode()})])], + KeyboardInterrupt, # break the loop in the test + ] + ) + redis.xack = AsyncMock() + + consumer = StreamConsumer(redis, "test:stream", "grp", "c1") + + results: list[tuple[str, dict]] = [] + try: + async for msg_id, data in consumer.consume(batch_size=1, block_ms=100): + results.append((msg_id, data)) + except KeyboardInterrupt: + pass + + # The idle timeout was swallowed; the next real message still came through. + assert results == [(b"5-0", payload)] + assert redis.xreadgroup.call_count == 3