From 82dc622544f7470f6648bcb3fdd8296cdf320f36 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 4 Jun 2026 22:31:24 +0000 Subject: [PATCH] feat(kevin): reconcile Alpaca bracket auto-closes + order status Bracket stop-loss/take-profit legs fill at Alpaca without passing through the executor, so those closes (and their P&L) were invisible locally. - broker: add get_order(nested) + list_orders to BaseBroker/AlpacaBroker (+ SimulatedBroker); BrokerOrder carries child legs - Trade gains broker_order_id (migration f6a7b8c9d0e1); executor stamps the entry order id - new api_gateway trade-reconcile loop: books a closing SELL + realized P&L when a bracket leg fills (idempotent on the leg order id), syncs PENDING->terminal status, logs drift; runs alongside portfolio_sync [ci skip] Co-Authored-By: Claude Opus 4.8 --- .../f6a7b8c9d0e1_add_trade_broker_order_id.py | 38 ++ backtester/simulated_broker.py | 22 + services/api_gateway/main.py | 16 +- services/api_gateway/tasks/trade_reconcile.py | 219 +++++++++ services/trade_executor/main.py | 1 + shared/broker/alpaca_broker.py | 51 ++ shared/broker/base.py | 50 +- shared/models/trading.py | 1 + shared/schemas/trading.py | 12 + tests/services/test_trade_executor.py | 28 ++ tests/services/test_trade_reconcile.py | 434 ++++++++++++++++++ tests/test_broker.py | 160 ++++++- tests/test_models.py | 25 + 13 files changed, 1049 insertions(+), 8 deletions(-) create mode 100644 alembic/versions/f6a7b8c9d0e1_add_trade_broker_order_id.py create mode 100644 services/api_gateway/tasks/trade_reconcile.py create mode 100644 tests/services/test_trade_reconcile.py diff --git a/alembic/versions/f6a7b8c9d0e1_add_trade_broker_order_id.py b/alembic/versions/f6a7b8c9d0e1_add_trade_broker_order_id.py new file mode 100644 index 0000000..6c4e5ac --- /dev/null +++ b/alembic/versions/f6a7b8c9d0e1_add_trade_broker_order_id.py @@ -0,0 +1,38 @@ +"""add broker_order_id column to trades. + +Links a local Trade row to its Alpaca order id so the reconciliation task +can fetch the bracket order (and its stop-loss / take-profit legs) and book +auto-closes that never pass through our executor. Nullable so existing rows +and manually-created trades keep loading. Indexed for the by-order lookup. + +Revision ID: f6a7b8c9d0e1 +Revises: e5f6a7b8c9d0 +Create Date: 2026-06-04 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +revision = "f6a7b8c9d0e1" +down_revision = "e5f6a7b8c9d0" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "trades", + sa.Column("broker_order_id", sa.String(length=64), nullable=True), + ) + op.create_index( + "ix_trades_broker_order_id", + "trades", + ["broker_order_id"], + ) + + +def downgrade() -> None: + op.drop_index("ix_trades_broker_order_id", table_name="trades") + op.drop_column("trades", "broker_order_id") diff --git a/backtester/simulated_broker.py b/backtester/simulated_broker.py index 3237268..eb6a346 100644 --- a/backtester/simulated_broker.py +++ b/backtester/simulated_broker.py @@ -13,6 +13,7 @@ from datetime import datetime, timezone from shared.broker.base import BaseBroker from shared.schemas.trading import ( AccountInfo, + BrokerOrder, OrderRequest, OrderResult, OrderSide, @@ -197,6 +198,27 @@ class SimulatedBroker(BaseBroker): timestamp=datetime.now(tz=timezone.utc), ) + async def get_order( + self, order_id: str, *, nested: bool = True + ) -> BrokerOrder | None: + """Return a leg-less FILLED order — simulation has no bracket legs.""" + return BrokerOrder( + order_id=order_id, + ticker="", + side=OrderSide.BUY, + qty=0, + filled_price=0.0, + status=OrderStatus.FILLED, + timestamp=datetime.now(tz=timezone.utc), + legs=[], + ) + + async def list_orders( + self, *, status: str = "all", limit: int = 100 + ) -> list[OrderResult]: + """No standing-order book in simulation — return nothing.""" + return [] + # ------------------------------------------------------------------ # Extra backtest-only methods # ------------------------------------------------------------------ diff --git a/services/api_gateway/main.py b/services/api_gateway/main.py index 9c40f4d..38104c2 100644 --- a/services/api_gateway/main.py +++ b/services/api_gateway/main.py @@ -46,20 +46,26 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI: # Start portfolio sync background task from services.api_gateway.tasks.portfolio_sync import portfolio_sync_loop + from services.api_gateway.tasks.trade_reconcile import trade_reconcile_loop sync_task = asyncio.create_task( portfolio_sync_loop(config, session_factory) ) + reconcile_task = asyncio.create_task( + trade_reconcile_loop(config, session_factory) + ) logger.info("API Gateway started") yield - # Cancel the sync task + # Cancel the background tasks sync_task.cancel() - try: - await sync_task - except asyncio.CancelledError: - pass + reconcile_task.cancel() + for task in (sync_task, reconcile_task): + try: + await task + except asyncio.CancelledError: + pass # Cleanup await app.state.redis.aclose() diff --git a/services/api_gateway/tasks/trade_reconcile.py b/services/api_gateway/tasks/trade_reconcile.py new file mode 100644 index 0000000..22118c7 --- /dev/null +++ b/services/api_gateway/tasks/trade_reconcile.py @@ -0,0 +1,219 @@ +"""Background task that reconciles local Kevin trades against Alpaca. + +A Kevin bracket entry places three orders at Alpaca: the entry (parent) plus a +stop-loss and a take-profit child leg. When a leg fills automatically at +Alpaca, that close never passes through our executor — so locally there is no +closing Trade row and no booked P&L. + +This task closes the gap. For each OPEN local Kevin entry (FILLED BUY with a +``broker_order_id`` and no closing trade yet) it fetches the order via +``broker.get_order(broker_order_id, nested=True)``. If a stop-loss or +take-profit leg has FILLED, it books the close locally (a SELL Trade carrying +realized P&L), idempotently — the closing trade's ``broker_order_id`` is the +filled leg's order id, so a leg is never double-booked. It also promotes +non-terminal local statuses (PENDING -> FILLED/REJECTED/CANCELLED) from the +parent order, and logs drift it cannot auto-resolve without crashing the loop. + +Runs on the same cadence as the portfolio-sync task during market hours. +""" + +from __future__ import annotations + +import asyncio +import logging + +from sqlalchemy import and_, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from services.api_gateway.config import ApiGatewayConfig +from services.api_gateway.tasks.portfolio_sync import is_market_open +from shared.broker.alpaca_broker import AlpacaBroker +from shared.broker.base import BaseBroker +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.models.trading import Trade, TradeSide, TradeStatus +from shared.schemas.trading import BrokerOrder, OrderResult, OrderStatus + +logger = logging.getLogger(__name__) + +_STATUS_MAP: dict[OrderStatus, TradeStatus] = { + OrderStatus.PENDING: TradeStatus.PENDING, + OrderStatus.FILLED: TradeStatus.FILLED, + OrderStatus.CANCELLED: TradeStatus.CANCELLED, + OrderStatus.REJECTED: TradeStatus.REJECTED, +} + + +def _filled_leg(order: BrokerOrder) -> OrderResult | None: + """Return the first FILLED child leg with a fill price, or ``None``.""" + for leg in order.legs: + if leg.status == OrderStatus.FILLED and leg.filled_price is not None: + return leg + return None + + +async def _already_booked(session: AsyncSession, leg_order_id: str) -> bool: + """True if a Trade already carries this leg's order id (dedup guard).""" + existing = ( + await session.execute( + select(Trade).where(Trade.broker_order_id == leg_order_id) + ) + ).scalar_one_or_none() + return existing is not None + + +async def _reconcile_trade( + session: AsyncSession, entry: Trade, order: BrokerOrder +) -> bool: + """Apply one trade's reconciliation. + + Returns ``True`` if a closing trade was booked. Books an auto-close when a + stop-loss / take-profit leg has filled (idempotent on the leg order id); + otherwise syncs a non-terminal local status from the parent order. + """ + leg = _filled_leg(order) + if leg is not None and leg.filled_price is not None: + if await _already_booked(session, leg.order_id): + return False + fill_price = leg.filled_price + pnl = (fill_price - entry.price) * leg.qty + session.add( + Trade( + ticker=entry.ticker, + side=TradeSide.SELL, + qty=leg.qty, + price=fill_price, + status=TradeStatus.FILLED, + strategy_id=KEVIN_STRATEGY_UUID, + signal_id=entry.signal_id, + broker_order_id=leg.order_id, + pnl=pnl, + ) + ) + logger.info( + "Reconciled auto-close for %s: leg %s filled @ %.2f, pnl=%.2f", + entry.ticker, + leg.order_id, + leg.filled_price, + pnl, + ) + return True + + # No filled exit leg — sync a non-terminal local status from the parent. + if entry.status == TradeStatus.PENDING: + mapped = _STATUS_MAP.get(order.status, TradeStatus.PENDING) + if mapped != entry.status: + entry.status = mapped + if order.filled_price is not None: + entry.price = order.filled_price + logger.info( + "Reconciled status for %s (%s) -> %s", + entry.ticker, + entry.broker_order_id, + mapped.value, + ) + return False + + +async def reconcile_once( + broker: BaseBroker, + session_factory: async_sessionmaker, +) -> None: + """Perform a single reconciliation cycle over open Kevin entries.""" + async with session_factory() as session: + open_entries = ( + ( + await session.execute( + select(Trade).where( + and_( + Trade.strategy_id == KEVIN_STRATEGY_UUID, + Trade.side == TradeSide.BUY, + Trade.status.in_( + [TradeStatus.FILLED, TradeStatus.PENDING] + ), + Trade.broker_order_id.isnot(None), + ) + ) + ) + ) + .scalars() + .all() + ) + + booked = 0 + for entry in open_entries: + broker_order_id = entry.broker_order_id + if broker_order_id is None: + continue + try: + order = await broker.get_order(broker_order_id, nested=True) + except Exception: + logger.exception( + "reconcile: get_order failed for %s (%s) — skipping row", + entry.ticker, + broker_order_id, + ) + continue + if order is None: + logger.warning( + "reconcile: Alpaca order %s for %s is missing — cannot " + "auto-resolve", + entry.broker_order_id, + entry.ticker, + ) + continue + try: + if await _reconcile_trade(session, entry, order): + booked += 1 + except Exception: + logger.exception( + "reconcile: booking failed for %s (%s) — skipping row", + entry.ticker, + entry.broker_order_id, + ) + + await session.commit() + + logger.info("Trade reconcile complete: open=%d booked=%d", len(open_entries), booked) + + +async def trade_reconcile_loop( + config: ApiGatewayConfig, + session_factory: async_sessionmaker, +) -> None: + """Run the reconcile loop until cancelled. + + Mirrors ``portfolio_sync_loop``: skips outside market hours, never crashes + on a single cycle's error, and disables itself when Alpaca credentials are + absent. + """ + if not config.alpaca_api_key or not config.alpaca_secret_key: + logger.warning( + "Alpaca API credentials not configured — trade reconcile disabled" + ) + return + + broker = AlpacaBroker( + api_key=config.alpaca_api_key, + secret_key=config.alpaca_secret_key, + paper=config.paper_trading, + ) + + logger.info( + "Trade reconcile started (interval=%ds, paper=%s)", + config.snapshot_interval_seconds, + config.paper_trading, + ) + + while True: + try: + if is_market_open(): + await reconcile_once(broker, session_factory) + else: + logger.debug("Market closed — skipping trade reconcile") + except asyncio.CancelledError: + logger.info("Trade reconcile task cancelled — shutting down") + raise + except Exception: + logger.exception("Trade reconcile error — will retry next interval") + + await asyncio.sleep(config.snapshot_interval_seconds) diff --git a/services/trade_executor/main.py b/services/trade_executor/main.py index 3490dac..f70775e 100644 --- a/services/trade_executor/main.py +++ b/services/trade_executor/main.py @@ -256,6 +256,7 @@ async def process_signal( strategy_id=signal.strategy_id, status=status_map.get(result.status, TradeStatusModel.PENDING), pnl=realized_pnl, + broker_order_id=result.order_id or None, ) session.add(db_trade) await session.commit() diff --git a/shared/broker/alpaca_broker.py b/shared/broker/alpaca_broker.py index fbaaad6..68d7f59 100644 --- a/shared/broker/alpaca_broker.py +++ b/shared/broker/alpaca_broker.py @@ -11,6 +11,7 @@ import asyncio import logging from datetime import datetime, timezone from decimal import Decimal +from typing import cast from alpaca.common.exceptions import APIError from alpaca.trading.client import TradingClient @@ -21,7 +22,10 @@ from alpaca.trading.enums import TimeInForce from alpaca.trading.models import Order as AlpacaOrder from alpaca.trading.models import Position as AlpacaPosition from alpaca.trading.models import TradeAccount +from alpaca.trading.enums import QueryOrderStatus from alpaca.trading.requests import ( + GetOrderByIdRequest, + GetOrdersRequest, LimitOrderRequest, MarketOrderRequest, StopLossRequest, @@ -32,6 +36,7 @@ from alpaca.trading.requests import ( from shared.broker.base import BaseBroker from shared.schemas.trading import ( AccountInfo, + BrokerOrder, OrderRequest, OrderResult, OrderSide, @@ -180,6 +185,14 @@ class AlpacaBroker(BaseBroker): timestamp=timestamp, ) + @classmethod + def _order_to_broker_order(cls, alpaca_order: AlpacaOrder) -> BrokerOrder: + """Convert an Alpaca ``Order`` (with optional nested legs) to a + ``BrokerOrder`` exposing each child leg as an ``OrderResult``.""" + base = cls._order_to_result(alpaca_order) + legs = [cls._order_to_result(leg) for leg in (alpaca_order.legs or [])] + return BrokerOrder(**base.model_dump(), legs=legs) + @staticmethod def _position_to_info(pos: AlpacaPosition) -> PositionInfo: """Convert an Alpaca ``Position`` to our ``PositionInfo``.""" @@ -260,6 +273,44 @@ class AlpacaBroker(BaseBroker): ) return self._order_to_result(alpaca_order) + async def get_order( + self, order_id: str, *, nested: bool = True + ) -> BrokerOrder | None: + """Fetch an order with its bracket child legs from Alpaca. + + Returns ``None`` if Alpaca does not know the order (404 -> APIError) + so reconciliation can log the drift instead of crashing. + """ + try: + # raw_data defaults to False, so the client returns an Order, not + # the dict the SDK's union signature also allows. + alpaca_order = cast( + AlpacaOrder, + await asyncio.to_thread( + self._client.get_order_by_id, + order_id, + GetOrderByIdRequest(nested=nested), + ), + ) + except APIError as exc: + logger.warning("Order %s not found at Alpaca: %s", order_id, exc) + return None + return self._order_to_broker_order(alpaca_order) + + async def list_orders( + self, *, status: str = "all", limit: int = 100 + ) -> list[OrderResult]: + """List orders from Alpaca, mapped to ``OrderResult``.""" + request = GetOrdersRequest( + status=QueryOrderStatus(status), + limit=limit, + ) + orders = cast( + "list[AlpacaOrder]", + await asyncio.to_thread(self._client.get_orders, request), + ) + return [self._order_to_result(o) for o in orders] + async def is_asset_tradable(self, symbol: str) -> bool: """Return True iff Alpaca lists *symbol* as tradable. diff --git a/shared/broker/base.py b/shared/broker/base.py index 7ed6ce8..bb84d42 100644 --- a/shared/broker/base.py +++ b/shared/broker/base.py @@ -9,7 +9,13 @@ changing strategy or execution logic. from abc import ABC, abstractmethod -from shared.schemas.trading import AccountInfo, OrderRequest, OrderResult, PositionInfo +from shared.schemas.trading import ( + AccountInfo, + BrokerOrder, + OrderRequest, + OrderResult, + PositionInfo, +) class BaseBroker(ABC): @@ -85,3 +91,45 @@ class BaseBroker(ABC): Current state of the order including fill price if applicable. """ ... + + @abstractmethod + async def get_order( + self, order_id: str, *, nested: bool = True + ) -> BrokerOrder | None: + """Fetch an order including its bracket child legs. + + Parameters + ---------- + order_id: + The brokerage-assigned order identifier. + nested: + When ``True`` (the default), child legs (stop-loss / take-profit) + are populated on the returned :class:`BrokerOrder` so callers can + tell which leg filled and at what price. + + Returns + ------- + BrokerOrder | None + The order with its legs, or ``None`` if the order does not exist. + """ + ... + + @abstractmethod + async def list_orders( + self, *, status: str = "all", limit: int = 100 + ) -> list[OrderResult]: + """List orders, optionally filtered by status. + + Parameters + ---------- + status: + One of ``"open"``, ``"closed"``, or ``"all"`` (the default). + limit: + Maximum number of orders to return. + + Returns + ------- + list[OrderResult] + One entry per matching order. + """ + ... diff --git a/shared/models/trading.py b/shared/models/trading.py index 0e6eeb5..fb66cdc 100644 --- a/shared/models/trading.py +++ b/shared/models/trading.py @@ -93,6 +93,7 @@ class Trade(TimestampMixin, Base): ) status: Mapped[TradeStatus] = mapped_column(nullable=False, default=TradeStatus.PENDING) pnl: Mapped[float | None] = mapped_column(Float, nullable=True) + broker_order_id: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True) # Relationships strategy: Mapped[Strategy | None] = relationship(back_populates="trades") diff --git a/shared/schemas/trading.py b/shared/schemas/trading.py index 7cca4d5..3462813 100644 --- a/shared/schemas/trading.py +++ b/shared/schemas/trading.py @@ -80,6 +80,18 @@ class OrderResult(BaseModel): model_config = {"from_attributes": True} +class BrokerOrder(OrderResult): + """An order plus its bracket child legs. + + Returned by ``BaseBroker.get_order`` so reconciliation can inspect a + bracket's stop-loss / take-profit legs (each an ``OrderResult``) to learn + which leg filled and at what price. Simple orders carry an empty + ``legs`` list. + """ + + legs: list[OrderResult] = Field(default_factory=list) + + class PositionInfo(BaseModel): """Current position state — used in API responses and portfolio views.""" diff --git a/tests/services/test_trade_executor.py b/tests/services/test_trade_executor.py index efe3870..6a962b9 100644 --- a/tests/services/test_trade_executor.py +++ b/tests/services/test_trade_executor.py @@ -912,6 +912,34 @@ class TestExecutorDBPersistence: assert trade_obj.ticker == "AAPL" assert trade_obj.signal_id == signal.signal_id + @pytest.mark.asyncio + async def test_entry_trade_records_broker_order_id(self): + """The persisted entry Trade must carry the Alpaca order id so + reconciliation can find the bracket order later.""" + config = _make_config() + broker = _mock_broker(positions=[], account=_make_account(100_000)) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal(direction=SignalDirection.LONG) + mock_session = AsyncMock() + mock_session.add = MagicMock() + mock_session.commit = AsyncMock() + db_factory = _make_mock_db_session_factory(mock_session) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, RiskManager(config, broker), broker, publisher, counters, db_factory + ) + + trade_obj = mock_session.add.call_args[0][0] + assert trade_obj.broker_order_id == "ord-123" + @pytest.mark.asyncio async def test_trade_not_persisted_without_db(self): """When db_session_factory is None, no DB write should happen.""" diff --git a/tests/services/test_trade_reconcile.py b/tests/services/test_trade_reconcile.py new file mode 100644 index 0000000..5880d9f --- /dev/null +++ b/tests/services/test_trade_reconcile.py @@ -0,0 +1,434 @@ +"""Tests for the trade reconciliation background task (Phase 4). + +A Kevin bracket entry places three orders at Alpaca: the entry (parent) plus a +stop-loss and a take-profit leg. When a leg fills automatically at Alpaca, the +close never passes through our executor — so locally there is no closing Trade +and no booked P&L. The reconcile task fetches each open entry's order via +``broker.get_order(broker_order_id, nested=True)``, detects a filled SL/TP leg, +and books the close (idempotently). It also syncs non-terminal local statuses +and logs drift it cannot auto-resolve. +""" + +from __future__ import annotations + +import asyncio +import uuid +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from services.api_gateway.config import ApiGatewayConfig +from services.api_gateway.tasks.trade_reconcile import ( + reconcile_once, + trade_reconcile_loop, +) +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.models.trading import Trade, TradeSide, TradeStatus +from shared.schemas.trading import ( + BrokerOrder, + OrderResult, + OrderSide, + OrderStatus, +) + +# --------------------------------------------------------------------------- +# Builders +# --------------------------------------------------------------------------- + + +def _entry_trade( + *, + ticker: str = "NVDA", + qty: float = 10.0, + price: float = 100.0, + broker_order_id: str = "parent-1", + status: TradeStatus = TradeStatus.FILLED, + signal_id: uuid.UUID | None = None, +) -> Trade: + return Trade( + id=uuid.uuid4(), + ticker=ticker, + side=TradeSide.BUY, + qty=qty, + price=price, + status=status, + strategy_id=KEVIN_STRATEGY_UUID, + signal_id=signal_id or uuid.uuid4(), + broker_order_id=broker_order_id, + pnl=None, + ) + + +def _leg( + *, + order_id: str, + status: OrderStatus, + filled_price: float | None, + qty: float = 10.0, + ticker: str = "NVDA", +) -> OrderResult: + return OrderResult( + order_id=order_id, + ticker=ticker, + side=OrderSide.SELL, + qty=qty, + filled_price=filled_price, + status=status, + timestamp=datetime.now(timezone.utc), + ) + + +def _bracket( + *, + parent_id: str = "parent-1", + parent_status: OrderStatus = OrderStatus.FILLED, + ticker: str = "NVDA", + qty: float = 10.0, + entry_price: float = 100.0, + legs: list[OrderResult] | None = None, +) -> BrokerOrder: + return BrokerOrder( + order_id=parent_id, + ticker=ticker, + side=OrderSide.BUY, + qty=qty, + filled_price=entry_price, + status=parent_status, + timestamp=datetime.now(timezone.utc), + legs=legs or [], + ) + + +class _FakeSession: + """Async session double driven by intent. + + ``open_trades`` is what the ``select(Trade).where(...open entries...)`` + query returns. ``existing_close_ids`` is the set of broker_order_ids + already booked as closing trades (drives the idempotency dedup query, + which is modelled as ``scalar_one_or_none``). New rows land in ``added``. + """ + + def __init__( + self, + open_trades: list[Trade], + existing_close_ids: set[str] | None = None, + ) -> None: + self._open_trades = open_trades + self._existing_close_ids = existing_close_ids or set() + self.added: list[Trade] = [] + self.committed = False + + async def __aenter__(self) -> "_FakeSession": + return self + + async def __aexit__(self, *exc) -> bool: + return False + + def add(self, obj: Trade) -> None: + self.added.append(obj) + # A freshly-booked close becomes visible to subsequent dedup lookups + # within the same reconcile pass. + if obj.broker_order_id: + self._existing_close_ids.add(obj.broker_order_id) + + async def commit(self) -> None: + self.committed = True + + async def execute(self, stmt): # noqa: ANN001 + text = str(stmt).lower() + # Dedup lookup is the only query with an equality on broker_order_id; + # the open-entries scan uses ``broker_order_id IS NOT NULL``. + if "broker_order_id =" in text: + params = stmt.compile().params + target = next( + (v for v in params.values() if isinstance(v, str)), + None, + ) + found = ( + _entry_trade(broker_order_id=target) + if target in self._existing_close_ids + else None + ) + return _Result(scalar=found) + # Open-entries scan + return _Result(rows=list(self._open_trades)) + + +class _Result: + def __init__(self, rows=None, scalar=None) -> None: # noqa: ANN001 + self._rows = rows or [] + self._scalar = scalar + + def scalars(self) -> "_Result": + return self + + def all(self) -> list: + return self._rows + + def scalar_one_or_none(self): # noqa: ANN001 + return self._scalar + + +def _factory(session: _FakeSession) -> MagicMock: + factory = MagicMock() + factory.return_value = session + return factory + + +def _config(**overrides) -> ApiGatewayConfig: + defaults = dict( + jwt_secret_key="test-secret-for-reconcile", + database_url="sqlite+aiosqlite:///:memory:", + redis_url="redis://localhost:6379/0", + alpaca_api_key="test-key", + alpaca_secret_key="test-secret", + paper_trading=True, + snapshot_interval_seconds=1, + ) + defaults.update(overrides) + return ApiGatewayConfig(**defaults) + + +# --------------------------------------------------------------------------- +# Booking auto-closes +# --------------------------------------------------------------------------- + + +class TestReconcileBooksClose: + async def test_books_close_on_take_profit_fill(self) -> None: + """TP leg FILLED @ 120 vs entry 100, qty 10 -> closing SELL, pnl=200.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + tp = _leg(order_id="tp-1", status=OrderStatus.FILLED, filled_price=120.0) + sl = _leg(order_id="sl-1", status=OrderStatus.PENDING, filled_price=None) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp, sl]) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + + assert len(session.added) == 1 + close = session.added[0] + assert close.side == TradeSide.SELL + assert close.ticker == "NVDA" + assert close.qty == 10.0 + assert close.price == 120.0 + assert close.status == TradeStatus.FILLED + assert close.pnl == pytest.approx(200.0) + assert close.strategy_id == KEVIN_STRATEGY_UUID + assert close.signal_id == entry.signal_id + # Dedup key is the filled leg's order id + assert close.broker_order_id == "tp-1" + broker.get_order.assert_awaited_once_with("parent-1", nested=True) + + async def test_books_close_on_stop_loss_fill(self) -> None: + """SL leg FILLED @ 92 vs entry 100, qty 10 -> closing SELL, pnl=-80.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + tp = _leg(order_id="tp-1", status=OrderStatus.PENDING, filled_price=None) + sl = _leg(order_id="sl-1", status=OrderStatus.FILLED, filled_price=92.0) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp, sl]) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + + assert len(session.added) == 1 + close = session.added[0] + assert close.price == 92.0 + assert close.pnl == pytest.approx(-80.0) + assert close.broker_order_id == "sl-1" + + async def test_noop_when_legs_unfilled(self) -> None: + """Both legs still open -> nothing booked.""" + entry = _entry_trade(broker_order_id="parent-1") + tp = _leg(order_id="tp-1", status=OrderStatus.PENDING, filled_price=None) + sl = _leg(order_id="sl-1", status=OrderStatus.PENDING, filled_price=None) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp, sl]) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + + assert session.added == [] + + +# --------------------------------------------------------------------------- +# Idempotency +# --------------------------------------------------------------------------- + + +class TestReconcileIdempotent: + async def test_does_not_double_book_filled_leg(self) -> None: + """If a closing trade already carries the filled leg's order id, a + second pass books nothing.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + tp = _leg(order_id="tp-1", status=OrderStatus.FILLED, filled_price=120.0) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp]) + ) + # tp-1 already booked + session = _FakeSession([entry], existing_close_ids={"tp-1"}) + + await reconcile_once(broker, _factory(session)) + + assert session.added == [] + + async def test_running_twice_books_once(self) -> None: + """Two back-to-back reconcile passes over the same fixtures book the + close exactly once.""" + entry = _entry_trade(broker_order_id="parent-1", price=100.0, qty=10.0) + tp = _leg(order_id="tp-1", status=OrderStatus.FILLED, filled_price=120.0) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket(parent_id="parent-1", legs=[tp]) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + await reconcile_once(broker, _factory(session)) + + assert len(session.added) == 1 + + +# --------------------------------------------------------------------------- +# Status sync +# --------------------------------------------------------------------------- + + +class TestReconcileStatusSync: + async def test_pending_entry_promoted_to_filled(self) -> None: + """A local PENDING entry whose Alpaca parent is now FILLED is updated + in place (and not booked as a close).""" + entry = _entry_trade( + broker_order_id="parent-1", status=TradeStatus.PENDING, price=0.0 + ) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket( + parent_id="parent-1", + parent_status=OrderStatus.FILLED, + entry_price=101.5, + legs=[], + ) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + + assert entry.status == TradeStatus.FILLED + assert session.added == [] + + async def test_pending_entry_promoted_to_rejected(self) -> None: + entry = _entry_trade( + broker_order_id="parent-1", status=TradeStatus.PENDING, price=0.0 + ) + broker = AsyncMock() + broker.get_order = AsyncMock( + return_value=_bracket( + parent_id="parent-1", + parent_status=OrderStatus.REJECTED, + legs=[], + ) + ) + session = _FakeSession([entry]) + + await reconcile_once(broker, _factory(session)) + + assert entry.status == TradeStatus.REJECTED + + +# --------------------------------------------------------------------------- +# Drift handling +# --------------------------------------------------------------------------- + + +class TestReconcileDrift: + async def test_missing_alpaca_order_logs_warning_no_raise(self, caplog) -> None: # noqa: ANN001 + """broker.get_order returns None (order gone) -> warn, do not raise, + do not book.""" + entry = _entry_trade(broker_order_id="parent-gone") + broker = AsyncMock() + broker.get_order = AsyncMock(return_value=None) + session = _FakeSession([entry]) + + with caplog.at_level("WARNING"): + await reconcile_once(broker, _factory(session)) + + assert session.added == [] + assert any("parent-gone" in r.message or "missing" in r.message.lower() + for r in caplog.records) + + async def test_bad_row_does_not_abort_others(self) -> None: + """One trade whose get_order raises must not stop a sibling trade's + close from being booked.""" + bad = _entry_trade(ticker="BAD", broker_order_id="bad-1") + good = _entry_trade(ticker="NVDA", broker_order_id="good-1", price=100.0, qty=10.0) + + async def get_order(order_id, *, nested=True): # noqa: ANN001 + if order_id == "bad-1": + raise RuntimeError("alpaca blew up on this one") + tp = _leg(order_id="tp-good", status=OrderStatus.FILLED, filled_price=130.0) + return _bracket(parent_id="good-1", legs=[tp]) + + broker = AsyncMock() + broker.get_order = AsyncMock(side_effect=get_order) + session = _FakeSession([bad, good]) + + await reconcile_once(broker, _factory(session)) + + # The good one still books + assert len(session.added) == 1 + assert session.added[0].ticker == "NVDA" + assert session.added[0].broker_order_id == "tp-good" + + +# --------------------------------------------------------------------------- +# Loop wiring +# --------------------------------------------------------------------------- + + +class TestReconcileLoop: + async def test_no_credentials_returns_immediately(self) -> None: + cfg = _config(alpaca_api_key="", alpaca_secret_key="") + await asyncio.wait_for( + trade_reconcile_loop(cfg, MagicMock()), timeout=2.0 + ) + + async def test_error_does_not_crash_loop(self) -> None: + cfg = _config() + calls = 0 + + async def boom(broker, sf): # noqa: ANN001 + nonlocal calls + calls += 1 + if calls == 1: + raise ConnectionError("down") + + with ( + patch("services.api_gateway.tasks.trade_reconcile.AlpacaBroker") as MB, + patch( + "services.api_gateway.tasks.trade_reconcile.reconcile_once", + side_effect=boom, + ), + patch( + "services.api_gateway.tasks.trade_reconcile.is_market_open", + return_value=True, + ), + ): + MB.return_value = AsyncMock() + task = asyncio.create_task(trade_reconcile_loop(cfg, MagicMock())) + await asyncio.sleep(2.5) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert calls >= 2 diff --git a/tests/test_broker.py b/tests/test_broker.py index 81b515a..18ccbef 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -42,6 +42,8 @@ def _make_alpaca_order( filled_avg_price: str | None = None, status: AlpacaOrderStatus = AlpacaOrderStatus.NEW, submitted_at: datetime | None = None, + order_class: OrderClass = OrderClass.SIMPLE, + legs: list[AlpacaOrder] | None = None, ) -> AlpacaOrder: """Build a minimal Alpaca ``Order`` model for testing.""" oid = uuid.UUID(order_id) if order_id else uuid.uuid4() @@ -66,7 +68,7 @@ def _make_alpaca_order( qty=qty, filled_qty="0", filled_avg_price=filled_avg_price, - order_class=OrderClass.SIMPLE, + order_class=order_class, order_type=AlpacaOrderType.MARKET, type=AlpacaOrderType.MARKET, side=side, @@ -75,7 +77,7 @@ def _make_alpaca_order( stop_price=None, status=status, extended_hours=False, - legs=None, + legs=legs, trail_percent=None, trail_price=None, hwm=None, @@ -537,3 +539,157 @@ class TestBaseBrokerInterface: from shared.broker.base import BaseBroker assert BB is BaseBroker + + +# --------------------------------------------------------------------------- +# get_order — nested bracket parent + legs (Phase 4) +# --------------------------------------------------------------------------- + + +class TestGetOrderNested: + """``get_order`` returns a ``BrokerOrder`` exposing the parent plus its + bracket child legs (stop-loss / take-profit) with status + fill price, so + reconciliation can tell which leg filled and at what price.""" + + @pytest.mark.asyncio + async def test_get_order_returns_broker_order( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + from shared.schemas.trading import BrokerOrder + + parent_id = str(uuid.uuid4()) + parent = _make_alpaca_order( + order_id=parent_id, + symbol="NVDA", + side=AlpacaOrderSide.BUY, + qty="10", + status=AlpacaOrderStatus.FILLED, + filled_avg_price="100.00", + order_class=OrderClass.BRACKET, + legs=[], + ) + mock_client.get_order_by_id.return_value = parent + + result = await broker.get_order(parent_id) + + assert isinstance(result, BrokerOrder) + assert result.order_id == parent_id + assert result.ticker == "NVDA" + assert result.status == OrderStatus.FILLED + assert result.filled_price == 100.00 + assert result.legs == [] + + @pytest.mark.asyncio + async def test_get_order_maps_take_profit_leg( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + """A bracket parent with a FILLED take-profit leg exposes that leg with + its fill price; the stop-loss leg is reported still pending.""" + parent_id = str(uuid.uuid4()) + tp_id = str(uuid.uuid4()) + sl_id = str(uuid.uuid4()) + tp_leg = _make_alpaca_order( + order_id=tp_id, + symbol="NVDA", + side=AlpacaOrderSide.SELL, + qty="10", + status=AlpacaOrderStatus.FILLED, + filled_avg_price="120.00", + ) + sl_leg = _make_alpaca_order( + order_id=sl_id, + symbol="NVDA", + side=AlpacaOrderSide.SELL, + qty="10", + status=AlpacaOrderStatus.HELD, + ) + parent = _make_alpaca_order( + order_id=parent_id, + symbol="NVDA", + side=AlpacaOrderSide.BUY, + qty="10", + status=AlpacaOrderStatus.FILLED, + filled_avg_price="100.00", + order_class=OrderClass.BRACKET, + legs=[tp_leg, sl_leg], + ) + mock_client.get_order_by_id.return_value = parent + + result = await broker.get_order(parent_id) + + assert len(result.legs) == 2 + tp = next(leg for leg in result.legs if leg.order_id == tp_id) + sl = next(leg for leg in result.legs if leg.order_id == sl_id) + assert tp.status == OrderStatus.FILLED + assert tp.filled_price == 120.00 + assert tp.side == OrderSide.SELL + assert sl.status == OrderStatus.PENDING + assert sl.filled_price is None + + @pytest.mark.asyncio + async def test_get_order_passes_nested_flag( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + """``nested=True`` must reach Alpaca as a GetOrderByIdRequest so the + legs come back populated.""" + from alpaca.trading.requests import GetOrderByIdRequest + + parent_id = str(uuid.uuid4()) + mock_client.get_order_by_id.return_value = _make_alpaca_order( + order_id=parent_id, order_class=OrderClass.BRACKET, legs=[] + ) + + await broker.get_order(parent_id, nested=True) + + call = mock_client.get_order_by_id.call_args + assert call.args[0] == parent_id + req = call.args[1] if len(call.args) > 1 else call.kwargs.get("filter") + assert isinstance(req, GetOrderByIdRequest) + assert req.nested is True + + @pytest.mark.asyncio + async def test_get_order_returns_none_on_missing( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + """A missing order (Alpaca 404 -> APIError) yields ``None`` rather than + raising, so reconciliation can log drift and move on.""" + mock_client.get_order_by_id.side_effect = APIError("order not found") + + result = await broker.get_order(str(uuid.uuid4())) + + assert result is None + + +# --------------------------------------------------------------------------- +# list_orders (Phase 4) +# --------------------------------------------------------------------------- + + +class TestListOrders: + @pytest.mark.asyncio + async def test_list_orders_maps_results( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + o1 = _make_alpaca_order(symbol="AAPL", status=AlpacaOrderStatus.FILLED, filled_avg_price="1") + o2 = _make_alpaca_order(symbol="MSFT", status=AlpacaOrderStatus.NEW) + mock_client.get_orders.return_value = [o1, o2] + + results = await broker.list_orders() + + assert len(results) == 2 + assert {r.ticker for r in results} == {"AAPL", "MSFT"} + assert all(isinstance(r, OrderResult) for r in results) + + @pytest.mark.asyncio + async def test_list_orders_builds_request( + self, broker: AlpacaBroker, mock_client: MagicMock + ) -> None: + from alpaca.trading.requests import GetOrdersRequest + + mock_client.get_orders.return_value = [] + + await broker.list_orders(status="all", limit=25) + + req = mock_client.get_orders.call_args.args[0] + assert isinstance(req, GetOrdersRequest) + assert req.limit == 25 diff --git a/tests/test_models.py b/tests/test_models.py index e4726b0..99f825f 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -123,6 +123,31 @@ class TestTrade: assert t.status == TradeStatus.FILLED assert t.pnl == 250.50 + def test_trade_broker_order_id_optional(self) -> None: + """broker_order_id links a Trade back to its Alpaca order for + reconciliation; it is nullable (entry trades set it, manual rows may + not).""" + t = Trade( + id=uuid.uuid4(), + ticker="NVDA", + side=TradeSide.BUY, + qty=10.0, + price=100.0, + status=TradeStatus.FILLED, + broker_order_id="alpaca-ord-abc", + ) + assert t.broker_order_id == "alpaca-ord-abc" + + t2 = Trade( + id=uuid.uuid4(), + ticker="NVDA", + side=TradeSide.BUY, + qty=10.0, + price=100.0, + status=TradeStatus.FILLED, + ) + assert t2.broker_order_id is None + class TestPosition: def test_create_position(self) -> None: