From ae5b3f89d1dda0a7c09062fee02b845af7ebc867 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 22 Feb 2026 15:13:26 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20project=20foundation=20=E2=80=94=20mono?= =?UTF-8?q?repo=20setup,=20shared=20config,=20redis=20streams,=20telemetry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pyproject.toml with core deps and optional dep groups per service - shared/config.py: Pydantic BaseSettings with TRADING_ env prefix - shared/redis_streams.py: StreamPublisher/StreamConsumer wrappers - shared/telemetry.py: OpenTelemetry + Prometheus metric export - tests for Redis Streams helpers (5 passing) --- pyproject.toml | 43 ++++++++++++++++ shared/__init__.py | 0 shared/config.py | 19 ++++++++ shared/redis_streams.py | 68 ++++++++++++++++++++++++++ shared/telemetry.py | 21 ++++++++ tests/__init__.py | 0 tests/test_redis_streams.py | 97 +++++++++++++++++++++++++++++++++++++ 7 files changed, 248 insertions(+) create mode 100644 pyproject.toml create mode 100644 shared/__init__.py create mode 100644 shared/config.py create mode 100644 shared/redis_streams.py create mode 100644 shared/telemetry.py create mode 100644 tests/__init__.py create mode 100644 tests/test_redis_streams.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e7d0c55 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,43 @@ +[project] +name = "trading-bot" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "sqlalchemy[asyncio]>=2.0", + "asyncpg>=0.29", + "redis>=5.0", + "pydantic>=2.0", + "pydantic-settings>=2.0", + "opentelemetry-sdk>=1.20", + "opentelemetry-exporter-prometheus>=0.45b", + "opentelemetry-api>=1.20", + "alembic>=1.13", +] + +[project.optional-dependencies] +api = ["fastapi>=0.110", "uvicorn[standard]>=0.27", "websockets>=12.0", "py-webauthn>=2.0", "pyjwt[crypto]>=2.8"] +news = ["feedparser>=6.0", "praw>=7.7", "httpx>=0.27"] +sentiment = ["transformers>=4.38", "torch>=2.2", "ollama>=0.1"] +trading = ["alpaca-py>=0.21"] +backtester = ["numpy>=1.26", "pandas>=2.2"] +dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "pytest-cov>=4.1", "ruff>=0.3", "mypy>=1.8"] + +[build-system] +requires = ["setuptools>=70.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +include = ["shared*", "tests*"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.ruff] +line-length = 120 +target-version = "py312" + +[tool.mypy] +python_version = "3.12" +warn_return_any = true +warn_unused_configs = true diff --git a/shared/__init__.py b/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shared/config.py b/shared/config.py new file mode 100644 index 0000000..aeef7a0 --- /dev/null +++ b/shared/config.py @@ -0,0 +1,19 @@ +"""Shared configuration for all trading bot services.""" + +from pydantic_settings import BaseSettings + + +class BaseConfig(BaseSettings): + """Base configuration shared across all services. + + All settings can be overridden via environment variables + prefixed with ``TRADING_``. + """ + + database_url: str = "postgresql+asyncpg://trading:trading@localhost:5432/trading" + redis_url: str = "redis://localhost:6379/0" + log_level: str = "INFO" + otel_service_name: str = "trading-bot" + otel_metrics_port: int = 9090 + + model_config = {"env_prefix": "TRADING_"} diff --git a/shared/redis_streams.py b/shared/redis_streams.py new file mode 100644 index 0000000..904d57a --- /dev/null +++ b/shared/redis_streams.py @@ -0,0 +1,68 @@ +"""Thin wrappers around redis-py Streams for publish/consume with JSON serialization.""" + +import json +import logging +from typing import AsyncIterator + +from redis.asyncio import Redis + +logger = logging.getLogger(__name__) + + +class StreamPublisher: + """Publishes JSON-encoded messages to a Redis Stream.""" + + def __init__(self, redis: Redis, stream: str) -> None: + self.redis = redis + self.stream = stream + + async def publish(self, data: dict) -> str: + """Serialize *data* as JSON and append to the stream via XADD. + + Returns the message ID assigned by Redis. + """ + msg_id = await self.redis.xadd(self.stream, {"data": json.dumps(data)}) + logger.debug("Published to %s: %s", self.stream, msg_id) + return msg_id + + +class StreamConsumer: + """Consumes JSON-encoded messages from a Redis Stream using consumer groups.""" + + def __init__(self, redis: Redis, stream: str, group: str, consumer: str) -> None: + self.redis = redis + self.stream = stream + self.group = group + self.consumer = consumer + + async def ensure_group(self) -> None: + """Create the consumer group if it does not already exist.""" + try: + await self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True) + logger.info("Created consumer group %s on %s", self.group, self.stream) + except Exception: + # Group already exists — this is expected on subsequent starts. + pass + + async def consume( + self, batch_size: int = 10, block_ms: int = 5000 + ) -> AsyncIterator[tuple[str, dict]]: + """Yield ``(msg_id, data)`` tuples from the stream. + + Messages are acknowledged immediately after yielding so they + won't be redelivered to this consumer. + """ + await self.ensure_group() + while True: + messages = await self.redis.xreadgroup( + self.group, + self.consumer, + {self.stream: ">"}, + count=batch_size, + block=block_ms, + ) + for _stream_name, entries in messages: + for msg_id, fields in entries: + data = json.loads(fields[b"data"]) + yield msg_id, data + await self.redis.xack(self.stream, self.group, msg_id) diff --git a/shared/telemetry.py b/shared/telemetry.py new file mode 100644 index 0000000..fc8cbf4 --- /dev/null +++ b/shared/telemetry.py @@ -0,0 +1,21 @@ +"""OpenTelemetry setup with Prometheus metric export.""" + +from opentelemetry import metrics +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.sdk.metrics import MeterProvider +from prometheus_client import start_http_server + + +def setup_telemetry(service_name: str, metrics_port: int = 9090) -> metrics.Meter: + """Initialise an OpenTelemetry MeterProvider backed by Prometheus. + + A Prometheus-compatible HTTP server is started on *metrics_port* + so that an external Prometheus instance can scrape ``/metrics``. + + Returns a ``Meter`` that services use to create counters, histograms, etc. + """ + reader = PrometheusMetricReader() + provider = MeterProvider(metric_readers=[reader]) + metrics.set_meter_provider(provider) + start_http_server(metrics_port) + return metrics.get_meter(service_name) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_redis_streams.py b/tests/test_redis_streams.py new file mode 100644 index 0000000..6363565 --- /dev/null +++ b/tests/test_redis_streams.py @@ -0,0 +1,97 @@ +"""Tests for the Redis Streams publish/consume helpers.""" + +import json +from unittest.mock import AsyncMock + +import pytest + +from shared.redis_streams import StreamConsumer, StreamPublisher + + +@pytest.mark.asyncio +async def test_publisher_publishes_json() -> None: + """StreamPublisher should XADD a JSON-serialised payload.""" + redis = AsyncMock() + redis.xadd = AsyncMock(return_value=b"1-0") + + pub = StreamPublisher(redis, "test:stream") + msg_id = await pub.publish({"ticker": "AAPL", "score": 0.8}) + + redis.xadd.assert_called_once_with( + "test:stream", + {"data": json.dumps({"ticker": "AAPL", "score": 0.8})}, + ) + assert msg_id == b"1-0" + + +@pytest.mark.asyncio +async def test_publisher_returns_message_id() -> None: + """The returned value must be the raw message ID from Redis.""" + redis = AsyncMock() + redis.xadd = AsyncMock(return_value=b"99999-42") + + pub = StreamPublisher(redis, "events") + result = await pub.publish({"key": "value"}) + + assert result == b"99999-42" + + +@pytest.mark.asyncio +async def test_consumer_ensure_group_creates_group() -> None: + """ensure_group should call XGROUP CREATE with mkstream=True.""" + redis = AsyncMock() + redis.xgroup_create = AsyncMock() + + consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1") + await consumer.ensure_group() + + redis.xgroup_create.assert_called_once_with( + "test:stream", "my-group", id="0", mkstream=True + ) + + +@pytest.mark.asyncio +async def test_consumer_ensure_group_ignores_existing() -> None: + """If the group already exists the exception should be swallowed.""" + redis = AsyncMock() + redis.xgroup_create = AsyncMock(side_effect=Exception("BUSYGROUP")) + + consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1") + # Should not raise + await consumer.ensure_group() + + +@pytest.mark.asyncio +async def test_consumer_consume_yields_and_acks() -> None: + """consume() should yield deserialised data and ACK each message.""" + redis = AsyncMock() + redis.xgroup_create = AsyncMock() + + payload = {"ticker": "TSLA", "direction": "LONG"} + # xreadgroup returns list of (stream, [(msg_id, fields), ...]) + redis.xreadgroup = AsyncMock( + side_effect=[ + [ + ( + b"test:stream", + [(b"1-0", {b"data": json.dumps(payload).encode()})], + ) + ], + # Second call returns empty to break the loop in the test + KeyboardInterrupt, + ] + ) + redis.xack = AsyncMock() + + consumer = StreamConsumer(redis, "test:stream", "grp", "c1") + + results: list[tuple[str, dict]] = [] + try: + async for msg_id, data in consumer.consume(batch_size=1, block_ms=100): + results.append((msg_id, data)) + except KeyboardInterrupt: + pass + + assert len(results) == 1 + assert results[0] == (b"1-0", payload) + redis.xack.assert_called_once_with("test:stream", "grp", b"1-0")