trading/shared/redis_streams.py
Viktor Barzin 5a6b20c8f1
fix: resolve 13 important issues from code review
I1: Add graceful shutdown (SIGTERM/SIGINT) to all 5 background services
I2: Fix Dockerfile healthcheck to use curl on /metrics endpoint
I3: Fix StreamConsumer.ensure_group() to only catch BUSYGROUP errors
I4: Fix SimulatedBroker to reject orders with insufficient cash/shares
I5: Move ORM attribute access inside DB session context in trades routes
I6: Add Redis-based rate limiting (10 req/min/IP) on all auth endpoints
I8: Prevent backtest background task garbage collection
I9: Use Numeric(16,6) instead of Float for financial columns in migration
I10: Add index on trades.created_at for time-range queries
I11: Bind infrastructure ports to 127.0.0.1 in docker-compose
I12: Add migrations init service; all app services depend on it
I13: Fix user enumeration in login_begin (return options for non-existent users)
2026-02-22 17:58:01 +00:00

71 lines
2.5 KiB
Python

"""Thin wrappers around redis-py Streams for publish/consume with JSON serialization."""
import json
import logging
from typing import AsyncIterator
from redis.asyncio import Redis
logger = logging.getLogger(__name__)
class StreamPublisher:
"""Publishes JSON-encoded messages to a Redis Stream."""
def __init__(self, redis: Redis, stream: str) -> None:
self.redis = redis
self.stream = stream
async def publish(self, data: dict) -> str:
"""Serialize *data* as JSON and append to the stream via XADD.
Returns the message ID assigned by Redis.
"""
msg_id = await self.redis.xadd(self.stream, {"data": json.dumps(data)})
logger.debug("Published to %s: %s", self.stream, msg_id)
return msg_id
class StreamConsumer:
"""Consumes JSON-encoded messages from a Redis Stream using consumer groups."""
def __init__(self, redis: Redis, stream: str, group: str, consumer: str) -> None:
self.redis = redis
self.stream = stream
self.group = group
self.consumer = consumer
async def ensure_group(self) -> None:
"""Create the consumer group if it does not already exist."""
try:
await self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True)
logger.info("Created consumer group %s on %s", self.group, self.stream)
except Exception as exc:
# BUSYGROUP means group already exists — expected on subsequent starts.
if "BUSYGROUP" in str(exc):
logger.debug("Consumer group %s already exists on %s", self.group, self.stream)
else:
raise
async def consume(
self, batch_size: int = 10, block_ms: int = 5000
) -> AsyncIterator[tuple[str, dict]]:
"""Yield ``(msg_id, data)`` tuples from the stream.
Messages are acknowledged immediately after yielding so they
won't be redelivered to this consumer.
"""
await self.ensure_group()
while True:
messages = await self.redis.xreadgroup(
self.group,
self.consumer,
{self.stream: ">"},
count=batch_size,
block=block_ms,
)
for _stream_name, entries in messages:
for msg_id, fields in entries:
data = json.loads(fields[b"data"])
yield msg_id, data
await self.redis.xack(self.stream, self.group, msg_id)