trading/services/market_data/main.py
Viktor Barzin d36ae40df1
feat: productionize local service — fix signal pipeline, lower thresholds, add company-name ticker extraction
- Point Ollama to local instance via host.docker.internal, use gemma3 model
- Remove Docker Ollama service (using host's Ollama instead)
- Add company-name-to-ticker mapping (Apple→AAPL, Tesla→TSLA, etc.) for RSS articles
- Lower signal thresholds for faster feedback with paper trading:
  - FinBERT confidence: 0.6→0.4, signal strength: 0.3→0.15
  - News strategy: article_count 2→1, confidence 0.5→0.3, score ±0.3→±0.15
- Fix market data BarSet access bug (BarSet.__contains__ returns False incorrectly)
- Fix market data SIP feed error by switching to IEX feed for free Alpaca accounts
- Fix nginx proxy routing for /api/auth/* to api-gateway /auth/*
- Add seed_sample_data script
- Update tests for new thresholds and alpaca mock modules
2026-02-22 22:17:26 +00:00

267 lines
7.9 KiB
Python

"""Market Data service -- main entry point.
Fetches historical and live OHLCV bars from Alpaca's market data API
and publishes them to the ``market:bars`` Redis Stream for consumption
by the signal generator and other downstream services.
"""
from __future__ import annotations
import asyncio
import logging
import signal
from datetime import datetime, timedelta, timezone
from redis.asyncio import Redis
from services.market_data.config import MarketDataConfig
from shared.redis_streams import StreamPublisher
from shared.telemetry import setup_telemetry
logger = logging.getLogger(__name__)
MARKET_BARS_STREAM = "market:bars"
def _parse_timeframe(timeframe_str: str):
"""Parse a timeframe string like '5Min' into an Alpaca TimeFrame object.
Returns a ``TimeFrame`` instance suitable for ``StockBarsRequest``.
"""
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
# Supported formats: "1Min", "5Min", "15Min", "1Hour", "1Day"
tf_map = {
"1Min": TimeFrame(1, TimeFrameUnit.Minute),
"5Min": TimeFrame(5, TimeFrameUnit.Minute),
"15Min": TimeFrame(15, TimeFrameUnit.Minute),
"1Hour": TimeFrame(1, TimeFrameUnit.Hour),
"1Day": TimeFrame(1, TimeFrameUnit.Day),
}
tf = tf_map.get(timeframe_str)
if tf is None:
raise ValueError(
f"Unsupported timeframe '{timeframe_str}'. "
f"Supported values: {list(tf_map.keys())}"
)
return tf
def _bar_to_dict(ticker: str, bar) -> dict:
"""Convert an Alpaca Bar object to a flat dictionary for Redis publishing."""
return {
"ticker": ticker,
"timestamp": bar.timestamp.isoformat(),
"open": float(bar.open),
"high": float(bar.high),
"low": float(bar.low),
"close": float(bar.close),
"volume": float(bar.volume),
}
async def _fetch_historical_bars(
client,
watchlist: list[str],
timeframe,
limit: int,
publisher: StreamPublisher,
bars_published_counter,
) -> int:
"""Fetch historical bars for each ticker and publish to Redis.
Returns the total number of bars published.
"""
from alpaca.data.enums import DataFeed
from alpaca.data.requests import StockBarsRequest
total_published = 0
# Use a start time far enough back to get the requested number of bars
start = datetime.now(timezone.utc) - timedelta(days=30)
for ticker in watchlist:
try:
request = StockBarsRequest(
symbol_or_symbols=[ticker],
timeframe=timeframe,
start=start,
limit=limit,
feed=DataFeed.IEX,
)
bars = await asyncio.to_thread(client.get_stock_bars, request)
try:
ticker_bars = bars[ticker]
except (KeyError, IndexError):
ticker_bars = []
for bar in ticker_bars:
msg = _bar_to_dict(ticker, bar)
await publisher.publish(msg)
total_published += 1
logger.info(
"Published %d historical bars for %s",
len(ticker_bars),
ticker,
)
except Exception:
logger.exception("Failed to fetch historical bars for %s", ticker)
if total_published:
bars_published_counter.add(total_published)
return total_published
async def _poll_latest_bars(
client,
watchlist: list[str],
timeframe,
publisher: StreamPublisher,
bars_published_counter,
) -> int:
"""Fetch the latest bar for each ticker and publish to Redis.
Returns the number of bars published.
"""
from alpaca.data.enums import DataFeed
from alpaca.data.requests import StockBarsRequest
published = 0
# Fetch bars from the last 10 minutes to ensure we get at least one
start = datetime.now(timezone.utc) - timedelta(minutes=10)
for ticker in watchlist:
try:
request = StockBarsRequest(
symbol_or_symbols=[ticker],
timeframe=timeframe,
start=start,
limit=1,
feed=DataFeed.IEX,
)
bars = await asyncio.to_thread(client.get_stock_bars, request)
try:
ticker_bars = bars[ticker]
except (KeyError, IndexError):
ticker_bars = []
if ticker_bars:
# Publish only the most recent bar
bar = ticker_bars[-1]
msg = _bar_to_dict(ticker, bar)
await publisher.publish(msg)
published += 1
logger.debug("Published latest bar for %s: close=%.2f", ticker, bar.close)
except Exception:
logger.exception("Failed to fetch latest bar for %s", ticker)
if published:
bars_published_counter.add(published)
return published
async def run(config: MarketDataConfig | None = None) -> None:
"""Main service loop.
Connects to Alpaca and Redis, fetches historical bars on startup,
then polls for new bars at the configured interval.
"""
if config is None:
config = MarketDataConfig()
logging.basicConfig(level=config.log_level)
logger.info("Starting Market Data service")
# --- Telemetry ---
meter = setup_telemetry("market-data", config.otel_metrics_port)
bars_published_counter = meter.create_counter(
"market_data.bars_published",
description="Total OHLCV bars published to market:bars stream",
)
poll_errors_counter = meter.create_counter(
"market_data.poll_errors",
description="Total poll cycle errors",
)
# --- Alpaca client ---
from alpaca.data.historical import StockHistoricalDataClient
client = StockHistoricalDataClient(
api_key=config.alpaca_api_key,
secret_key=config.alpaca_secret_key,
)
# --- Redis ---
redis = Redis.from_url(config.redis_url, decode_responses=False)
publisher = StreamPublisher(redis, MARKET_BARS_STREAM)
# --- Parse timeframe ---
timeframe = _parse_timeframe(config.bar_timeframe)
# --- Graceful shutdown ---
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_event.set)
try:
# Fetch historical bars on startup
logger.info(
"Fetching %d historical bars for watchlist: %s",
config.historical_bars,
config.watchlist,
)
total = await _fetch_historical_bars(
client,
config.watchlist,
timeframe,
config.historical_bars,
publisher,
bars_published_counter,
)
logger.info("Historical backfill complete: %d total bars published", total)
# Poll loop
logger.info(
"Starting poll loop (interval=%ds) for watchlist: %s",
config.poll_interval_seconds,
config.watchlist,
)
while not shutdown_event.is_set():
try:
await asyncio.wait_for(
shutdown_event.wait(),
timeout=config.poll_interval_seconds,
)
break # Shutdown signaled
except asyncio.TimeoutError:
pass # Normal timeout — time to poll
try:
count = await _poll_latest_bars(
client,
config.watchlist,
timeframe,
publisher,
bars_published_counter,
)
logger.info("Poll cycle complete: %d bars published", count)
except Exception:
logger.exception("Poll cycle failed")
poll_errors_counter.add(1)
finally:
await redis.aclose()
logger.info("Market data service stopped gracefully")
def main() -> None:
"""CLI entry point."""
asyncio.run(run())
if __name__ == "__main__":
main()