"""Tests for the Redis Streams publish/consume helpers.""" import json from unittest.mock import AsyncMock import pytest from shared.redis_streams import StreamConsumer, StreamPublisher @pytest.mark.asyncio async def test_publisher_publishes_json() -> None: """StreamPublisher should XADD a JSON-serialised payload.""" redis = AsyncMock() redis.xadd = AsyncMock(return_value=b"1-0") pub = StreamPublisher(redis, "test:stream") msg_id = await pub.publish({"ticker": "AAPL", "score": 0.8}) redis.xadd.assert_called_once_with( "test:stream", {"data": json.dumps({"ticker": "AAPL", "score": 0.8})}, ) assert msg_id == b"1-0" @pytest.mark.asyncio async def test_publisher_returns_message_id() -> None: """The returned value must be the raw message ID from Redis.""" redis = AsyncMock() redis.xadd = AsyncMock(return_value=b"99999-42") pub = StreamPublisher(redis, "events") result = await pub.publish({"key": "value"}) assert result == b"99999-42" @pytest.mark.asyncio async def test_consumer_ensure_group_creates_group() -> None: """ensure_group should call XGROUP CREATE with mkstream=True.""" redis = AsyncMock() redis.xgroup_create = AsyncMock() consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1") await consumer.ensure_group() redis.xgroup_create.assert_called_once_with( "test:stream", "my-group", id="0", mkstream=True ) @pytest.mark.asyncio async def test_consumer_ensure_group_ignores_existing() -> None: """If the group already exists the exception should be swallowed.""" redis = AsyncMock() redis.xgroup_create = AsyncMock(side_effect=Exception("BUSYGROUP")) consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1") # Should not raise await consumer.ensure_group() @pytest.mark.asyncio async def test_consumer_consume_yields_and_acks() -> None: """consume() should yield deserialised data and ACK each message.""" redis = AsyncMock() redis.xgroup_create = AsyncMock() payload = {"ticker": "TSLA", "direction": "LONG"} # xreadgroup returns list of (stream, [(msg_id, fields), ...]) redis.xreadgroup = AsyncMock( side_effect=[ [ ( b"test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})], ) ], # Second call returns empty to break the loop in the test KeyboardInterrupt, ] ) redis.xack = AsyncMock() consumer = StreamConsumer(redis, "test:stream", "grp", "c1") results: list[tuple[str, dict]] = [] try: async for msg_id, data in consumer.consume(batch_size=1, block_ms=100): results.append((msg_id, data)) except KeyboardInterrupt: pass assert len(results) == 1 assert results[0] == (b"1-0", payload) redis.xack.assert_called_once_with("test:stream", "grp", b"1-0")