feat: project foundation — monorepo setup, shared config, redis streams, telemetry
- pyproject.toml with core deps and optional dep groups per service - shared/config.py: Pydantic BaseSettings with TRADING_ env prefix - shared/redis_streams.py: StreamPublisher/StreamConsumer wrappers - shared/telemetry.py: OpenTelemetry + Prometheus metric export - tests for Redis Streams helpers (5 passing)
This commit is contained in:
parent
0ac9884b89
commit
ae5b3f89d1
7 changed files with 248 additions and 0 deletions
68
shared/redis_streams.py
Normal file
68
shared/redis_streams.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
"""Thin wrappers around redis-py Streams for publish/consume with JSON serialization."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import AsyncIterator
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamPublisher:
|
||||
"""Publishes JSON-encoded messages to a Redis Stream."""
|
||||
|
||||
def __init__(self, redis: Redis, stream: str) -> None:
|
||||
self.redis = redis
|
||||
self.stream = stream
|
||||
|
||||
async def publish(self, data: dict) -> str:
|
||||
"""Serialize *data* as JSON and append to the stream via XADD.
|
||||
|
||||
Returns the message ID assigned by Redis.
|
||||
"""
|
||||
msg_id = await self.redis.xadd(self.stream, {"data": json.dumps(data)})
|
||||
logger.debug("Published to %s: %s", self.stream, msg_id)
|
||||
return msg_id
|
||||
|
||||
|
||||
class StreamConsumer:
|
||||
"""Consumes JSON-encoded messages from a Redis Stream using consumer groups."""
|
||||
|
||||
def __init__(self, redis: Redis, stream: str, group: str, consumer: str) -> None:
|
||||
self.redis = redis
|
||||
self.stream = stream
|
||||
self.group = group
|
||||
self.consumer = consumer
|
||||
|
||||
async def ensure_group(self) -> None:
|
||||
"""Create the consumer group if it does not already exist."""
|
||||
try:
|
||||
await self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True)
|
||||
logger.info("Created consumer group %s on %s", self.group, self.stream)
|
||||
except Exception:
|
||||
# Group already exists — this is expected on subsequent starts.
|
||||
pass
|
||||
|
||||
async def consume(
|
||||
self, batch_size: int = 10, block_ms: int = 5000
|
||||
) -> AsyncIterator[tuple[str, dict]]:
|
||||
"""Yield ``(msg_id, data)`` tuples from the stream.
|
||||
|
||||
Messages are acknowledged immediately after yielding so they
|
||||
won't be redelivered to this consumer.
|
||||
"""
|
||||
await self.ensure_group()
|
||||
while True:
|
||||
messages = await self.redis.xreadgroup(
|
||||
self.group,
|
||||
self.consumer,
|
||||
{self.stream: ">"},
|
||||
count=batch_size,
|
||||
block=block_ms,
|
||||
)
|
||||
for _stream_name, entries in messages:
|
||||
for msg_id, fields in entries:
|
||||
data = json.loads(fields[b"data"])
|
||||
yield msg_id, data
|
||||
await self.redis.xack(self.stream, self.group, msg_id)
|
||||
Loading…
Add table
Add a link
Reference in a new issue