All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
redis-py raises redis.exceptions.TimeoutError when a blocking XREADGROUP returns no data within block_ms. On idle streams (US market closed → no market:bars / signals:generated / trades:executed) every blocking read times out; the unhandled exception tore down each worker's asyncio.TaskGroup and exited the process, putting signal-generator, trade-executor and learning-engine into CrashLoopBackOff. Catch it and keep polling. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
77 lines
2.8 KiB
Python
77 lines
2.8 KiB
Python
"""Thin wrappers around redis-py Streams for publish/consume with JSON serialization."""
|
|
|
|
import json
|
|
import logging
|
|
from typing import AsyncIterator
|
|
|
|
from redis.asyncio import Redis
|
|
from redis.exceptions import TimeoutError as RedisTimeoutError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamPublisher:
|
|
"""Publishes JSON-encoded messages to a Redis Stream."""
|
|
|
|
def __init__(self, redis: Redis, stream: str) -> None:
|
|
self.redis = redis
|
|
self.stream = stream
|
|
|
|
async def publish(self, data: dict) -> str:
|
|
"""Serialize *data* as JSON and append to the stream via XADD.
|
|
|
|
Returns the message ID assigned by Redis.
|
|
"""
|
|
msg_id = await self.redis.xadd(self.stream, {"data": json.dumps(data)})
|
|
logger.debug("Published to %s: %s", self.stream, msg_id)
|
|
return msg_id
|
|
|
|
|
|
class StreamConsumer:
|
|
"""Consumes JSON-encoded messages from a Redis Stream using consumer groups."""
|
|
|
|
def __init__(self, redis: Redis, stream: str, group: str, consumer: str) -> None:
|
|
self.redis = redis
|
|
self.stream = stream
|
|
self.group = group
|
|
self.consumer = consumer
|
|
|
|
async def ensure_group(self) -> None:
|
|
"""Create the consumer group if it does not already exist."""
|
|
try:
|
|
await self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True)
|
|
logger.info("Created consumer group %s on %s", self.group, self.stream)
|
|
except Exception as exc:
|
|
# BUSYGROUP means group already exists — expected on subsequent starts.
|
|
if "BUSYGROUP" in str(exc):
|
|
logger.debug("Consumer group %s already exists on %s", self.group, self.stream)
|
|
else:
|
|
raise
|
|
|
|
async def consume(
|
|
self, batch_size: int = 10, block_ms: int = 5000
|
|
) -> AsyncIterator[tuple[str, dict]]:
|
|
"""Yield ``(msg_id, data)`` tuples from the stream.
|
|
|
|
Messages are acknowledged immediately after yielding so they
|
|
won't be redelivered to this consumer.
|
|
"""
|
|
await self.ensure_group()
|
|
while True:
|
|
try:
|
|
messages = await self.redis.xreadgroup(
|
|
self.group,
|
|
self.consumer,
|
|
{self.stream: ">"},
|
|
count=batch_size,
|
|
block=block_ms,
|
|
)
|
|
except RedisTimeoutError:
|
|
# redis-py raises this when a blocking read returns no data
|
|
# within block_ms (idle stream). Expected — keep polling.
|
|
continue
|
|
for _stream_name, entries in messages:
|
|
for msg_id, fields in entries:
|
|
data = json.loads(fields[b"data"])
|
|
yield msg_id, data
|
|
await self.redis.xack(self.stream, self.group, msg_id)
|