"""Thin wrappers around redis-py Streams for publish/consume with JSON serialization.""" import json import logging from typing import AsyncIterator from redis.asyncio import Redis logger = logging.getLogger(__name__) class StreamPublisher: """Publishes JSON-encoded messages to a Redis Stream.""" def __init__(self, redis: Redis, stream: str) -> None: self.redis = redis self.stream = stream async def publish(self, data: dict) -> str: """Serialize *data* as JSON and append to the stream via XADD. Returns the message ID assigned by Redis. """ msg_id = await self.redis.xadd(self.stream, {"data": json.dumps(data)}) logger.debug("Published to %s: %s", self.stream, msg_id) return msg_id class StreamConsumer: """Consumes JSON-encoded messages from a Redis Stream using consumer groups.""" def __init__(self, redis: Redis, stream: str, group: str, consumer: str) -> None: self.redis = redis self.stream = stream self.group = group self.consumer = consumer async def ensure_group(self) -> None: """Create the consumer group if it does not already exist.""" try: await self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True) logger.info("Created consumer group %s on %s", self.group, self.stream) except Exception as exc: # BUSYGROUP means group already exists — expected on subsequent starts. if "BUSYGROUP" in str(exc): logger.debug("Consumer group %s already exists on %s", self.group, self.stream) else: raise async def consume( self, batch_size: int = 10, block_ms: int = 5000 ) -> AsyncIterator[tuple[str, dict]]: """Yield ``(msg_id, data)`` tuples from the stream. Messages are acknowledged immediately after yielding so they won't be redelivered to this consumer. """ await self.ensure_group() while True: messages = await self.redis.xreadgroup( self.group, self.consumer, {self.stream: ">"}, count=batch_size, block=block_ms, ) for _stream_name, entries in messages: for msg_id, fields in entries: data = json.loads(fields[b"data"]) yield msg_id, data await self.redis.xack(self.stream, self.group, msg_id)