From 5fce576e330f413f9ae85b6dc61aa0c9347ee080 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 29 May 2026 05:49:15 +0000 Subject: [PATCH] fix(redis-streams): survive blocking-read timeout on idle streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit redis-py raises redis.exceptions.TimeoutError when a blocking XREADGROUP returns no data within block_ms. On idle streams (US market closed → no market:bars / signals:generated / trades:executed) every blocking read times out; the unhandled exception tore down each worker's asyncio.TaskGroup and exited the process, putting signal-generator, trade-executor and learning-engine into CrashLoopBackOff. Catch it and keep polling. Co-Authored-By: Claude Opus 4.7 --- shared/redis_streams.py | 20 ++++++++++++------- tests/test_redis_streams.py | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/shared/redis_streams.py b/shared/redis_streams.py index 718ed16..5e8095a 100644 --- a/shared/redis_streams.py +++ b/shared/redis_streams.py @@ -5,6 +5,7 @@ import logging from typing import AsyncIterator from redis.asyncio import Redis +from redis.exceptions import TimeoutError as RedisTimeoutError logger = logging.getLogger(__name__) @@ -57,13 +58,18 @@ class StreamConsumer: """ await self.ensure_group() while True: - messages = await self.redis.xreadgroup( - self.group, - self.consumer, - {self.stream: ">"}, - count=batch_size, - block=block_ms, - ) + try: + messages = await self.redis.xreadgroup( + self.group, + self.consumer, + {self.stream: ">"}, + count=batch_size, + block=block_ms, + ) + except RedisTimeoutError: + # redis-py raises this when a blocking read returns no data + # within block_ms (idle stream). Expected — keep polling. + continue for _stream_name, entries in messages: for msg_id, fields in entries: data = json.loads(fields[b"data"]) diff --git a/tests/test_redis_streams.py b/tests/test_redis_streams.py index 6363565..863f8ab 100644 --- a/tests/test_redis_streams.py +++ b/tests/test_redis_streams.py @@ -95,3 +95,41 @@ async def test_consumer_consume_yields_and_acks() -> None: assert len(results) == 1 assert results[0] == (b"1-0", payload) redis.xack.assert_called_once_with("test:stream", "grp", b"1-0") + + +@pytest.mark.asyncio +async def test_consume_survives_idle_blocking_timeout() -> None: + """A blocking-read TimeoutError on an idle stream must not kill the loop. + + redis-py raises ``redis.exceptions.TimeoutError`` when a blocking + ``XREADGROUP`` returns no data within ``block_ms`` (idle stream). This is + an expected idle condition, not a fatal error — ``consume`` must swallow it + and keep polling. + """ + from redis.exceptions import TimeoutError as RedisTimeoutError + + redis = AsyncMock() + redis.xgroup_create = AsyncMock() + + payload = {"ticker": "NVDA", "direction": "LONG"} + redis.xreadgroup = AsyncMock( + side_effect=[ + RedisTimeoutError("Timeout reading from redis"), # idle stream + [(b"test:stream", [(b"5-0", {b"data": json.dumps(payload).encode()})])], + KeyboardInterrupt, # break the loop in the test + ] + ) + redis.xack = AsyncMock() + + consumer = StreamConsumer(redis, "test:stream", "grp", "c1") + + results: list[tuple[str, dict]] = [] + try: + async for msg_id, data in consumer.consume(batch_size=1, block_ms=100): + results.append((msg_id, data)) + except KeyboardInterrupt: + pass + + # The idle timeout was swallowed; the next real message still came through. + assert results == [(b"5-0", payload)] + assert redis.xreadgroup.call_count == 3