"""Market Data service -- main entry point. Fetches historical and live OHLCV bars from Alpaca's market data API and publishes them to the ``market:bars`` Redis Stream for consumption by the signal generator and other downstream services. """ from __future__ import annotations import asyncio import logging import signal from datetime import datetime, timedelta, timezone from redis.asyncio import Redis from services.market_data.config import MarketDataConfig from shared.redis_streams import StreamPublisher from shared.telemetry import setup_telemetry logger = logging.getLogger(__name__) MARKET_BARS_STREAM = "market:bars" def _parse_timeframe(timeframe_str: str): """Parse a timeframe string like '5Min' into an Alpaca TimeFrame object. Returns a ``TimeFrame`` instance suitable for ``StockBarsRequest``. """ from alpaca.data.timeframe import TimeFrame, TimeFrameUnit # Supported formats: "1Min", "5Min", "15Min", "1Hour", "1Day" tf_map = { "1Min": TimeFrame(1, TimeFrameUnit.Minute), "5Min": TimeFrame(5, TimeFrameUnit.Minute), "15Min": TimeFrame(15, TimeFrameUnit.Minute), "1Hour": TimeFrame(1, TimeFrameUnit.Hour), "1Day": TimeFrame(1, TimeFrameUnit.Day), } tf = tf_map.get(timeframe_str) if tf is None: raise ValueError( f"Unsupported timeframe '{timeframe_str}'. " f"Supported values: {list(tf_map.keys())}" ) return tf def _bar_to_dict(ticker: str, bar) -> dict: """Convert an Alpaca Bar object to a flat dictionary for Redis publishing.""" return { "ticker": ticker, "timestamp": bar.timestamp.isoformat(), "open": float(bar.open), "high": float(bar.high), "low": float(bar.low), "close": float(bar.close), "volume": float(bar.volume), } async def _fetch_historical_bars( client, watchlist: list[str], timeframe, limit: int, publisher: StreamPublisher, bars_published_counter, ) -> int: """Fetch historical bars for each ticker and publish to Redis. Returns the total number of bars published. """ from alpaca.data.enums import DataFeed from alpaca.data.requests import StockBarsRequest total_published = 0 # Use a start time far enough back to get the requested number of bars start = datetime.now(timezone.utc) - timedelta(days=30) for ticker in watchlist: try: request = StockBarsRequest( symbol_or_symbols=[ticker], timeframe=timeframe, start=start, limit=limit, feed=DataFeed.IEX, ) bars = await asyncio.to_thread(client.get_stock_bars, request) try: ticker_bars = bars[ticker] except (KeyError, IndexError): ticker_bars = [] for bar in ticker_bars: msg = _bar_to_dict(ticker, bar) await publisher.publish(msg) total_published += 1 logger.info( "Published %d historical bars for %s", len(ticker_bars), ticker, ) except Exception: logger.exception("Failed to fetch historical bars for %s", ticker) if total_published: bars_published_counter.add(total_published) return total_published async def _poll_latest_bars( client, watchlist: list[str], timeframe, publisher: StreamPublisher, bars_published_counter, ) -> int: """Fetch the latest bar for each ticker and publish to Redis. Returns the number of bars published. """ from alpaca.data.enums import DataFeed from alpaca.data.requests import StockBarsRequest published = 0 # Fetch bars from the last 10 minutes to ensure we get at least one start = datetime.now(timezone.utc) - timedelta(minutes=10) for ticker in watchlist: try: request = StockBarsRequest( symbol_or_symbols=[ticker], timeframe=timeframe, start=start, limit=1, feed=DataFeed.IEX, ) bars = await asyncio.to_thread(client.get_stock_bars, request) try: ticker_bars = bars[ticker] except (KeyError, IndexError): ticker_bars = [] if ticker_bars: # Publish only the most recent bar bar = ticker_bars[-1] msg = _bar_to_dict(ticker, bar) await publisher.publish(msg) published += 1 logger.debug("Published latest bar for %s: close=%.2f", ticker, bar.close) except Exception: logger.exception("Failed to fetch latest bar for %s", ticker) if published: bars_published_counter.add(published) return published async def run(config: MarketDataConfig | None = None) -> None: """Main service loop. Connects to Alpaca and Redis, fetches historical bars on startup, then polls for new bars at the configured interval. """ if config is None: config = MarketDataConfig() logging.basicConfig(level=config.log_level) logger.info("Starting Market Data service") # --- Telemetry --- meter = setup_telemetry("market-data", config.otel_metrics_port) bars_published_counter = meter.create_counter( "market_data.bars_published", description="Total OHLCV bars published to market:bars stream", ) poll_errors_counter = meter.create_counter( "market_data.poll_errors", description="Total poll cycle errors", ) # --- Alpaca client --- from alpaca.data.historical import StockHistoricalDataClient client = StockHistoricalDataClient( api_key=config.alpaca_api_key, secret_key=config.alpaca_secret_key, ) # --- Redis --- redis = Redis.from_url(config.redis_url, decode_responses=False) publisher = StreamPublisher(redis, MARKET_BARS_STREAM) # --- Parse timeframe --- timeframe = _parse_timeframe(config.bar_timeframe) # --- Graceful shutdown --- shutdown_event = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, shutdown_event.set) try: # Fetch historical bars on startup logger.info( "Fetching %d historical bars for watchlist: %s", config.historical_bars, config.watchlist, ) total = await _fetch_historical_bars( client, config.watchlist, timeframe, config.historical_bars, publisher, bars_published_counter, ) logger.info("Historical backfill complete: %d total bars published", total) # Poll loop logger.info( "Starting poll loop (interval=%ds) for watchlist: %s", config.poll_interval_seconds, config.watchlist, ) while not shutdown_event.is_set(): try: await asyncio.wait_for( shutdown_event.wait(), timeout=config.poll_interval_seconds, ) break # Shutdown signaled except asyncio.TimeoutError: pass # Normal timeout — time to poll try: count = await _poll_latest_bars( client, config.watchlist, timeframe, publisher, bars_published_counter, ) logger.info("Poll cycle complete: %d bars published", count) except Exception: logger.exception("Poll cycle failed") poll_errors_counter.add(1) finally: await redis.aclose() logger.info("Market data service stopped gracefully") def main() -> None: """CLI entry point.""" asyncio.run(run()) if __name__ == "__main__": main()