Wire the trading bot to real Alpaca market data and persist pipeline state to the database so the dashboard displays live information. - Add market-data service fetching OHLCV bars from Alpaca, publishing to market:bars Redis Stream; signal generator consumes bars and injects current_price into signals for position sizing - Sentiment analyzer now persists Article + ArticleSentiment rows to DB after scoring, with duplicate and error handling - API gateway runs a background portfolio sync task that snapshots Alpaca account state into PortfolioSnapshot/Position DB tables during market hours - TradeSignal carries a signal_id UUID; signal generator and trade executor both persist their records to DB with cross-references - 303 unit tests pass (57 new tests added)
119 lines
3.5 KiB
Python
119 lines
3.5 KiB
Python
"""FastAPI application — API Gateway for the trading bot."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
from typing import AsyncIterator
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from redis.asyncio import Redis
|
|
|
|
from services.api_gateway.auth.routes import router as auth_router
|
|
from services.api_gateway.config import ApiGatewayConfig
|
|
from shared.db import create_db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
|
|
"""Build and configure the FastAPI application.
|
|
|
|
Parameters
|
|
----------
|
|
config:
|
|
Optional config override (useful for testing). If ``None``, a new
|
|
:class:`ApiGatewayConfig` is created from environment variables.
|
|
"""
|
|
if config is None:
|
|
config = ApiGatewayConfig()
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|
"""Start-up / shutdown hook — connect DB and Redis."""
|
|
# Database
|
|
engine, session_factory = create_db(config)
|
|
app.state.db_engine = engine
|
|
app.state.db_session_factory = session_factory
|
|
|
|
# Redis
|
|
app.state.redis = Redis.from_url(
|
|
config.redis_url, decode_responses=True
|
|
)
|
|
app.state.config = config
|
|
|
|
# Start portfolio sync background task
|
|
from services.api_gateway.tasks.portfolio_sync import portfolio_sync_loop
|
|
|
|
sync_task = asyncio.create_task(
|
|
portfolio_sync_loop(config, session_factory)
|
|
)
|
|
|
|
logger.info("API Gateway started")
|
|
yield
|
|
|
|
# Cancel the sync task
|
|
sync_task.cancel()
|
|
try:
|
|
await sync_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Cleanup
|
|
await app.state.redis.aclose()
|
|
await engine.dispose()
|
|
logger.info("API Gateway stopped")
|
|
|
|
app = FastAPI(
|
|
title="Trading Bot API",
|
|
version="0.1.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=config.cors_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Auth routes (unauthenticated)
|
|
app.include_router(auth_router)
|
|
|
|
# Trading routes (authenticated) — imported lazily to avoid circular deps
|
|
from services.api_gateway.routes.portfolio import router as portfolio_router
|
|
from services.api_gateway.routes.trades import router as trades_router
|
|
from services.api_gateway.routes.signals import router as signals_router
|
|
from services.api_gateway.routes.strategies import router as strategies_router
|
|
from services.api_gateway.routes.news import router as news_router
|
|
from services.api_gateway.routes.controls import router as controls_router
|
|
from services.api_gateway.routes.backtest import router as backtest_router
|
|
|
|
app.include_router(portfolio_router)
|
|
app.include_router(trades_router)
|
|
app.include_router(signals_router)
|
|
app.include_router(strategies_router)
|
|
app.include_router(news_router)
|
|
app.include_router(controls_router)
|
|
app.include_router(backtest_router)
|
|
|
|
# WebSocket
|
|
from services.api_gateway.ws import router as ws_router
|
|
|
|
app.include_router(ws_router)
|
|
|
|
# Health check
|
|
@app.get("/health", tags=["health"])
|
|
async def health() -> dict:
|
|
return {"status": "ok"}
|
|
|
|
return app
|
|
|
|
|
|
def get_app() -> FastAPI:
|
|
"""Lazy app factory for uvicorn: ``uvicorn services.api_gateway.main:get_app --factory``."""
|
|
return create_app()
|