trading/services/api_gateway/tasks/trade_reconcile.py

243 lines
8.6 KiB
Python
Raw Normal View History

"""Background task that reconciles local Kevin trades against Alpaca.
A Kevin bracket entry places three orders at Alpaca: the entry (parent) plus a
stop-loss and a take-profit child leg. When a leg fills automatically at
Alpaca, that close never passes through our executor so locally there is no
closing Trade row and no booked P&L.
This task closes the gap. For each OPEN local Kevin entry (FILLED BUY with a
``broker_order_id`` and no closing trade yet) it fetches the order via
``broker.get_order(broker_order_id, nested=True)``. If a stop-loss or
take-profit leg has FILLED, it books the close locally (a SELL Trade carrying
realized P&L), idempotently the closing trade's ``broker_order_id`` is the
filled leg's order id, so a leg is never double-booked. It also promotes
non-terminal local statuses (PENDING -> FILLED/REJECTED/CANCELLED) from the
parent order, and logs drift it cannot auto-resolve without crashing the loop.
Runs on the same cadence as the portfolio-sync task during market hours.
"""
from __future__ import annotations
import asyncio
import logging
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from services.api_gateway.config import ApiGatewayConfig
from services.api_gateway.tasks.portfolio_sync import is_market_open
from shared.broker.alpaca_broker import AlpacaBroker
from shared.broker.base import BaseBroker
from shared.constants.kevin import KEVIN_STRATEGY_UUID
from shared.models.trading import Trade, TradeSide, TradeStatus
from shared.schemas.trading import BrokerOrder, OrderResult, OrderStatus
from shared.slack_notifier import SlackNotifier
logger = logging.getLogger(__name__)
_STATUS_MAP: dict[OrderStatus, TradeStatus] = {
OrderStatus.PENDING: TradeStatus.PENDING,
OrderStatus.FILLED: TradeStatus.FILLED,
OrderStatus.CANCELLED: TradeStatus.CANCELLED,
OrderStatus.REJECTED: TradeStatus.REJECTED,
}
def _filled_leg(order: BrokerOrder) -> OrderResult | None:
"""Return the first FILLED child leg with a fill price, or ``None``."""
for leg in order.legs:
if leg.status == OrderStatus.FILLED and leg.filled_price is not None:
return leg
return None
async def _already_booked(session: AsyncSession, leg_order_id: str) -> bool:
"""True if a Trade already carries this leg's order id (dedup guard)."""
existing = (
await session.execute(
select(Trade).where(Trade.broker_order_id == leg_order_id)
)
).scalar_one_or_none()
return existing is not None
async def _reconcile_trade(
session: AsyncSession, entry: Trade, order: BrokerOrder
) -> Trade | None:
"""Apply one trade's reconciliation.
Returns the booked closing ``Trade`` when a stop-loss / take-profit leg
has filled (idempotent on the leg order id); otherwise syncs a
non-terminal local status from the parent order and returns ``None``.
"""
leg = _filled_leg(order)
if leg is not None and leg.filled_price is not None:
if await _already_booked(session, leg.order_id):
return None
fill_price = leg.filled_price
pnl = (fill_price - entry.price) * leg.qty
close = Trade(
ticker=entry.ticker,
side=TradeSide.SELL,
qty=leg.qty,
price=fill_price,
status=TradeStatus.FILLED,
strategy_id=KEVIN_STRATEGY_UUID,
signal_id=entry.signal_id,
broker_order_id=leg.order_id,
pnl=pnl,
)
session.add(close)
logger.info(
"Reconciled auto-close for %s: leg %s filled @ %.2f, pnl=%.2f",
entry.ticker,
leg.order_id,
leg.filled_price,
pnl,
)
return close
# No filled exit leg — sync a non-terminal local status from the parent.
if entry.status == TradeStatus.PENDING:
mapped = _STATUS_MAP.get(order.status, TradeStatus.PENDING)
if mapped != entry.status:
entry.status = mapped
if order.filled_price is not None:
entry.price = order.filled_price
logger.info(
"Reconciled status for %s (%s) -> %s",
entry.ticker,
entry.broker_order_id,
mapped.value,
)
return None
async def reconcile_once(
broker: BaseBroker,
session_factory: async_sessionmaker,
notifier: SlackNotifier | None = None,
) -> None:
"""Perform a single reconciliation cycle over open Kevin entries."""
async with session_factory() as session:
open_entries = (
(
await session.execute(
select(Trade).where(
and_(
Trade.strategy_id == KEVIN_STRATEGY_UUID,
Trade.side == TradeSide.BUY,
Trade.status.in_(
[TradeStatus.FILLED, TradeStatus.PENDING]
),
Trade.broker_order_id.isnot(None),
)
)
)
)
.scalars()
.all()
)
booked = 0
for entry in open_entries:
broker_order_id = entry.broker_order_id
if broker_order_id is None:
continue
try:
order = await broker.get_order(broker_order_id, nested=True)
except Exception:
logger.exception(
"reconcile: get_order failed for %s (%s) — skipping row",
entry.ticker,
broker_order_id,
)
continue
if order is None:
logger.warning(
"reconcile: Alpaca order %s for %s is missing — cannot "
"auto-resolve",
entry.broker_order_id,
entry.ticker,
)
continue
try:
close = await _reconcile_trade(session, entry, order)
except Exception:
logger.exception(
"reconcile: booking failed for %s (%s) — skipping row",
entry.ticker,
entry.broker_order_id,
)
continue
if close is None:
continue
booked += 1
if notifier is not None:
# Slack is an observer — its failure must not lose the row.
try:
await notifier.notify_close(
ticker=close.ticker,
qty=close.qty,
price=close.price,
pnl=close.pnl or 0.0,
strategy_id=close.strategy_id,
reason="bracket leg filled at broker",
)
except Exception:
logger.warning("reconcile: close notification failed", exc_info=True)
await session.commit()
logger.info("Trade reconcile complete: open=%d booked=%d", len(open_entries), booked)
async def trade_reconcile_loop(
config: ApiGatewayConfig,
session_factory: async_sessionmaker,
) -> None:
"""Run the reconcile loop until cancelled.
Mirrors ``portfolio_sync_loop``: skips outside market hours, never crashes
on a single cycle's error, and disables itself when Alpaca credentials are
absent.
"""
if not config.alpaca_api_key or not config.alpaca_secret_key:
logger.warning(
"Alpaca API credentials not configured — trade reconcile disabled"
)
return
broker = AlpacaBroker(
api_key=config.alpaca_api_key,
secret_key=config.alpaca_secret_key,
paper=config.paper_trading,
)
notifier = SlackNotifier(
webhook_url=config.slack_webhook_url,
bot_token=config.slack_bot_token,
channel=config.slack_channel,
)
logger.info(
"Trade reconcile started (interval=%ds, paper=%s, slack=%s)",
config.snapshot_interval_seconds,
config.paper_trading,
"on" if notifier.enabled else "off",
)
while True:
try:
if is_market_open():
await reconcile_once(broker, session_factory, notifier=notifier)
else:
logger.debug("Market closed — skipping trade reconcile")
except asyncio.CancelledError:
logger.info("Trade reconcile task cancelled — shutting down")
raise
except Exception:
logger.exception("Trade reconcile error — will retry next interval")
await asyncio.sleep(config.snapshot_interval_seconds)