feat(kevin): reconcile Alpaca bracket auto-closes + order status
Bracket stop-loss/take-profit legs fill at Alpaca without passing through the executor, so those closes (and their P&L) were invisible locally. - broker: add get_order(nested) + list_orders to BaseBroker/AlpacaBroker (+ SimulatedBroker); BrokerOrder carries child legs - Trade gains broker_order_id (migration f6a7b8c9d0e1); executor stamps the entry order id - new api_gateway trade-reconcile loop: books a closing SELL + realized P&L when a bracket leg fills (idempotent on the leg order id), syncs PENDING->terminal status, logs drift; runs alongside portfolio_sync [ci skip] Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
52b3c76482
commit
82dc622544
13 changed files with 1049 additions and 8 deletions
|
|
@ -46,20 +46,26 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
|
|||
|
||||
# Start portfolio sync background task
|
||||
from services.api_gateway.tasks.portfolio_sync import portfolio_sync_loop
|
||||
from services.api_gateway.tasks.trade_reconcile import trade_reconcile_loop
|
||||
|
||||
sync_task = asyncio.create_task(
|
||||
portfolio_sync_loop(config, session_factory)
|
||||
)
|
||||
reconcile_task = asyncio.create_task(
|
||||
trade_reconcile_loop(config, session_factory)
|
||||
)
|
||||
|
||||
logger.info("API Gateway started")
|
||||
yield
|
||||
|
||||
# Cancel the sync task
|
||||
# Cancel the background tasks
|
||||
sync_task.cancel()
|
||||
try:
|
||||
await sync_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
reconcile_task.cancel()
|
||||
for task in (sync_task, reconcile_task):
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# Cleanup
|
||||
await app.state.redis.aclose()
|
||||
|
|
|
|||
219
services/api_gateway/tasks/trade_reconcile.py
Normal file
219
services/api_gateway/tasks/trade_reconcile.py
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
"""Background task that reconciles local Kevin trades against Alpaca.
|
||||
|
||||
A Kevin bracket entry places three orders at Alpaca: the entry (parent) plus a
|
||||
stop-loss and a take-profit child leg. When a leg fills automatically at
|
||||
Alpaca, that close never passes through our executor — so locally there is no
|
||||
closing Trade row and no booked P&L.
|
||||
|
||||
This task closes the gap. For each OPEN local Kevin entry (FILLED BUY with a
|
||||
``broker_order_id`` and no closing trade yet) it fetches the order via
|
||||
``broker.get_order(broker_order_id, nested=True)``. If a stop-loss or
|
||||
take-profit leg has FILLED, it books the close locally (a SELL Trade carrying
|
||||
realized P&L), idempotently — the closing trade's ``broker_order_id`` is the
|
||||
filled leg's order id, so a leg is never double-booked. It also promotes
|
||||
non-terminal local statuses (PENDING -> FILLED/REJECTED/CANCELLED) from the
|
||||
parent order, and logs drift it cannot auto-resolve without crashing the loop.
|
||||
|
||||
Runs on the same cadence as the portfolio-sync task during market hours.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
from services.api_gateway.config import ApiGatewayConfig
|
||||
from services.api_gateway.tasks.portfolio_sync import is_market_open
|
||||
from shared.broker.alpaca_broker import AlpacaBroker
|
||||
from shared.broker.base import BaseBroker
|
||||
from shared.constants.kevin import KEVIN_STRATEGY_UUID
|
||||
from shared.models.trading import Trade, TradeSide, TradeStatus
|
||||
from shared.schemas.trading import BrokerOrder, OrderResult, OrderStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_STATUS_MAP: dict[OrderStatus, TradeStatus] = {
|
||||
OrderStatus.PENDING: TradeStatus.PENDING,
|
||||
OrderStatus.FILLED: TradeStatus.FILLED,
|
||||
OrderStatus.CANCELLED: TradeStatus.CANCELLED,
|
||||
OrderStatus.REJECTED: TradeStatus.REJECTED,
|
||||
}
|
||||
|
||||
|
||||
def _filled_leg(order: BrokerOrder) -> OrderResult | None:
|
||||
"""Return the first FILLED child leg with a fill price, or ``None``."""
|
||||
for leg in order.legs:
|
||||
if leg.status == OrderStatus.FILLED and leg.filled_price is not None:
|
||||
return leg
|
||||
return None
|
||||
|
||||
|
||||
async def _already_booked(session: AsyncSession, leg_order_id: str) -> bool:
|
||||
"""True if a Trade already carries this leg's order id (dedup guard)."""
|
||||
existing = (
|
||||
await session.execute(
|
||||
select(Trade).where(Trade.broker_order_id == leg_order_id)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
return existing is not None
|
||||
|
||||
|
||||
async def _reconcile_trade(
|
||||
session: AsyncSession, entry: Trade, order: BrokerOrder
|
||||
) -> bool:
|
||||
"""Apply one trade's reconciliation.
|
||||
|
||||
Returns ``True`` if a closing trade was booked. Books an auto-close when a
|
||||
stop-loss / take-profit leg has filled (idempotent on the leg order id);
|
||||
otherwise syncs a non-terminal local status from the parent order.
|
||||
"""
|
||||
leg = _filled_leg(order)
|
||||
if leg is not None and leg.filled_price is not None:
|
||||
if await _already_booked(session, leg.order_id):
|
||||
return False
|
||||
fill_price = leg.filled_price
|
||||
pnl = (fill_price - entry.price) * leg.qty
|
||||
session.add(
|
||||
Trade(
|
||||
ticker=entry.ticker,
|
||||
side=TradeSide.SELL,
|
||||
qty=leg.qty,
|
||||
price=fill_price,
|
||||
status=TradeStatus.FILLED,
|
||||
strategy_id=KEVIN_STRATEGY_UUID,
|
||||
signal_id=entry.signal_id,
|
||||
broker_order_id=leg.order_id,
|
||||
pnl=pnl,
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"Reconciled auto-close for %s: leg %s filled @ %.2f, pnl=%.2f",
|
||||
entry.ticker,
|
||||
leg.order_id,
|
||||
leg.filled_price,
|
||||
pnl,
|
||||
)
|
||||
return True
|
||||
|
||||
# No filled exit leg — sync a non-terminal local status from the parent.
|
||||
if entry.status == TradeStatus.PENDING:
|
||||
mapped = _STATUS_MAP.get(order.status, TradeStatus.PENDING)
|
||||
if mapped != entry.status:
|
||||
entry.status = mapped
|
||||
if order.filled_price is not None:
|
||||
entry.price = order.filled_price
|
||||
logger.info(
|
||||
"Reconciled status for %s (%s) -> %s",
|
||||
entry.ticker,
|
||||
entry.broker_order_id,
|
||||
mapped.value,
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def reconcile_once(
|
||||
broker: BaseBroker,
|
||||
session_factory: async_sessionmaker,
|
||||
) -> None:
|
||||
"""Perform a single reconciliation cycle over open Kevin entries."""
|
||||
async with session_factory() as session:
|
||||
open_entries = (
|
||||
(
|
||||
await session.execute(
|
||||
select(Trade).where(
|
||||
and_(
|
||||
Trade.strategy_id == KEVIN_STRATEGY_UUID,
|
||||
Trade.side == TradeSide.BUY,
|
||||
Trade.status.in_(
|
||||
[TradeStatus.FILLED, TradeStatus.PENDING]
|
||||
),
|
||||
Trade.broker_order_id.isnot(None),
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
)
|
||||
|
||||
booked = 0
|
||||
for entry in open_entries:
|
||||
broker_order_id = entry.broker_order_id
|
||||
if broker_order_id is None:
|
||||
continue
|
||||
try:
|
||||
order = await broker.get_order(broker_order_id, nested=True)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"reconcile: get_order failed for %s (%s) — skipping row",
|
||||
entry.ticker,
|
||||
broker_order_id,
|
||||
)
|
||||
continue
|
||||
if order is None:
|
||||
logger.warning(
|
||||
"reconcile: Alpaca order %s for %s is missing — cannot "
|
||||
"auto-resolve",
|
||||
entry.broker_order_id,
|
||||
entry.ticker,
|
||||
)
|
||||
continue
|
||||
try:
|
||||
if await _reconcile_trade(session, entry, order):
|
||||
booked += 1
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"reconcile: booking failed for %s (%s) — skipping row",
|
||||
entry.ticker,
|
||||
entry.broker_order_id,
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.info("Trade reconcile complete: open=%d booked=%d", len(open_entries), booked)
|
||||
|
||||
|
||||
async def trade_reconcile_loop(
|
||||
config: ApiGatewayConfig,
|
||||
session_factory: async_sessionmaker,
|
||||
) -> None:
|
||||
"""Run the reconcile loop until cancelled.
|
||||
|
||||
Mirrors ``portfolio_sync_loop``: skips outside market hours, never crashes
|
||||
on a single cycle's error, and disables itself when Alpaca credentials are
|
||||
absent.
|
||||
"""
|
||||
if not config.alpaca_api_key or not config.alpaca_secret_key:
|
||||
logger.warning(
|
||||
"Alpaca API credentials not configured — trade reconcile disabled"
|
||||
)
|
||||
return
|
||||
|
||||
broker = AlpacaBroker(
|
||||
api_key=config.alpaca_api_key,
|
||||
secret_key=config.alpaca_secret_key,
|
||||
paper=config.paper_trading,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Trade reconcile started (interval=%ds, paper=%s)",
|
||||
config.snapshot_interval_seconds,
|
||||
config.paper_trading,
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
if is_market_open():
|
||||
await reconcile_once(broker, session_factory)
|
||||
else:
|
||||
logger.debug("Market closed — skipping trade reconcile")
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Trade reconcile task cancelled — shutting down")
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("Trade reconcile error — will retry next interval")
|
||||
|
||||
await asyncio.sleep(config.snapshot_interval_seconds)
|
||||
Loading…
Add table
Add a link
Reference in a new issue