feat(kevin): correct exits, realized P&L, wire exit scanner

- executor: EXIT/SELL signals close the FULL held broker position (not a target_dollars-sized fresh order) and skip when flat

- executor: book realized P&L on the closing trade ((fill - avg_entry)*qty) so the dashboard P&L + win-rate populate; entries leave pnl=None

- exit scanner: wired into the bridge run loop on kevin_bridge_exit_scan_cron (daily ET gate; croniter intentionally not a dependency) plus an offsetting-SELL guard so it only emits exits for currently-held tickers

[ci skip]

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-04 22:13:30 +00:00
parent a8b0d33bd1
commit 52b3c76482
7 changed files with 587 additions and 15 deletions

View file

@ -27,15 +27,26 @@ class ExitScanner:
session_factory: Callable[..., Any],
publisher: Any,
config: Any,
broker: Any,
) -> None:
self.session_factory = session_factory
self.publisher = publisher
self.config = config
self.broker = broker
async def scan_and_emit_exits(self) -> int:
"""Returns the number of EXIT signals emitted."""
now = datetime.now(timezone.utc)
emitted = 0
# Offsetting-SELL guard: only emit exits for tickers STILL held at the
# broker, so we never re-emit for an already-closed position. With zero
# open positions this set is empty → the scan is a safe no-op.
positions = await self.broker.get_positions()
held_tickers = {p.ticker for p in positions if p.qty != 0}
if not held_tickers:
return 0
async with self.session_factory() as session:
# Find open Kevin trades (FILLED, no closing trade yet on same ticker)
open_trades = (
@ -54,6 +65,9 @@ class ExitScanner:
)
for trade in open_trades:
# Skip tickers no longer held at the broker (already closed).
if trade.ticker not in held_tickers:
continue
# Find the source audit row to learn the original holding_days target
async with self.session_factory() as session:
audit = (

View file

@ -9,8 +9,10 @@ from __future__ import annotations
import asyncio
import logging
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from zoneinfo import ZoneInfo
from shared.constants.kevin import KEVIN_STRATEGY_UUID
from shared.schemas.kevin import KevinAccountState, KevinDecisionType
@ -18,6 +20,56 @@ from shared.schemas.trading import SignalDirection, TradeSignal
logger = logging.getLogger(__name__)
_ET = ZoneInfo("America/New_York")
def should_run_exit_scan(
cron: str,
now_et: datetime,
last_run_date: date | None,
) -> bool:
"""Decide whether the daily exit-scan should run right now.
``croniter`` is not a project dependency, so we parse only the fields the
Kevin schedule uses ``minute hour * * dow`` and apply a simple
once-per-ET-weekday gate:
* fire only on a weekday listed in the cron's day-of-week field,
* only at/after the cron's HH:MM (ET),
* at most once per ET calendar day (tracked via ``last_run_date``).
The hour/minute and DOW are honoured; the day-of-month / month fields are
treated as wildcards (the Kevin cron always sets them to ``*``).
"""
minute, hour, _dom, _month, dow = cron.split()
target_minutes = int(hour) * 60 + int(minute)
# cron DOW: 0/7 = Sunday … 6 = Saturday. Python weekday(): Mon=0 … Sun=6.
allowed_dows = _parse_cron_dow(dow)
py_to_cron = {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 0}
if py_to_cron[now_et.weekday()] not in allowed_dows:
return False
if now_et.hour * 60 + now_et.minute < target_minutes:
return False
return last_run_date != now_et.date()
def _parse_cron_dow(dow: str) -> set[int]:
"""Expand a cron day-of-week field (``*``, ``1-5``, ``1,3,5``) to a set of
cron DOW integers (0/7 = Sunday 6 = Saturday)."""
if dow == "*":
return set(range(7))
days: set[int] = set()
for part in dow.split(","):
if "-" in part:
lo, hi = (int(x) for x in part.split("-"))
days.update(range(lo, hi + 1))
else:
days.add(int(part))
return days
class KevinBridge:
"""End-to-end orchestrator. Composed from injected collaborators
@ -239,6 +291,7 @@ async def run() -> None:
from services.kevin_signal_bridge.blocklist import KevinBlocklist
from services.kevin_signal_bridge.config import KevinBridgeConfig
from services.kevin_signal_bridge.cursor import RedisCursor
from services.kevin_signal_bridge.exit_scanner import ExitScanner
from services.kevin_signal_bridge.risk_counters import KevinRiskCounters
from shared.broker.alpaca_broker import AlpacaBroker
from shared.db import create_db
@ -316,6 +369,18 @@ async def run() -> None:
risk_counters=risk_counters,
)
# Daily exit scan — emits EXIT signals for Kevin positions whose hold has
# elapsed and that are STILL held at the broker. Shares the bridge's
# publisher + broker; gated to fire once per ET weekday (see
# should_run_exit_scan; croniter is intentionally not a dependency).
exit_scanner = ExitScanner(
session_factory=session_factory,
publisher=publisher,
config=config,
broker=broker,
)
last_exit_scan_date: date | None = None
stop = asyncio.Event()
def _on_signal(*_: Any) -> None:
@ -343,6 +408,18 @@ async def run() -> None:
except Exception:
logger.exception("Bridge poll iteration failed")
if should_run_exit_scan(
config.kevin_bridge_exit_scan_cron,
datetime.now(_ET),
last_exit_scan_date,
):
try:
emitted = await exit_scanner.scan_and_emit_exits()
last_exit_scan_date = datetime.now(_ET).date()
logger.info("Exit scan emitted %d EXIT signal(s)", emitted)
except Exception:
logger.exception("Exit scan failed")
try:
await asyncio.wait_for(
stop.wait(),

View file

@ -32,6 +32,7 @@ from shared.schemas.trading import (
OrderRequest,
OrderSide,
OrderStatus,
PositionInfo,
SignalDirection,
TradeExecution,
TradeSignal,
@ -56,6 +57,19 @@ async def _next_market_open(broker: AlpacaBroker) -> datetime:
return next_open.astimezone(timezone.utc)
async def _held_position(broker: AlpacaBroker, ticker: str) -> PositionInfo | None:
"""Return the currently-held position for *ticker*, or ``None`` if flat.
Used to size EXIT orders off the live broker position rather than the
signal's target_dollars.
"""
positions = await broker.get_positions()
for pos in positions:
if pos.ticker == ticker and pos.qty != 0:
return pos
return None
def _build_order_request(
signal: TradeSignal,
side: OrderSide,
@ -156,15 +170,32 @@ async def process_signal(
return
# --- Step 2: calculate position size ---
account = await broker.get_account()
qty = risk_manager.calculate_position_size(signal, account)
if qty <= 0:
logger.info("Position size is zero for %s — skipping", signal.ticker)
counters["rejections"].add(1, {"reason": "zero_position_size"})
return
# Entries (LONG) size from target_dollars/strength via the risk manager.
# Exits (EXIT/SELL) close the FULL currently-held broker position — a
# Kevin EXIT carries target_dollars, so sizing it via the risk manager
# would open/size a fresh position instead of flattening the existing one.
side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL
exit_avg_entry: float | None = None
if signal.direction == SignalDirection.LONG:
account = await broker.get_account()
qty = risk_manager.calculate_position_size(signal, account)
if qty <= 0:
logger.info("Position size is zero for %s — skipping", signal.ticker)
counters["rejections"].add(1, {"reason": "zero_position_size"})
return
else:
held = await _held_position(broker, signal.ticker)
if held is None:
logger.info(
"EXIT for %s but no position held — skipping (no order)",
signal.ticker,
)
counters["rejections"].add(1, {"reason": "no_position_to_close"})
return
qty = abs(held.qty)
exit_avg_entry = held.avg_entry
# --- Step 3: create order ---
side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL
order_request = _build_order_request(signal, side, qty, risk_manager)
# --- Step 4: submit order ---
@ -188,6 +219,18 @@ async def process_signal(
timestamp=result.timestamp,
)
# --- Step 5b: realized P&L on close ---
# The closing (EXIT) trade carries the round-trip P&L; entry trades leave
# pnl=None. avg_entry is captured from the held position BEFORE the sell.
# Only book P&L on a fill — a rejected/pending sell has no realized result.
realized_pnl: float | None = None
if (
exit_avg_entry is not None
and result.status == OrderStatus.FILLED
and result.filled_price is not None
):
realized_pnl = (result.filled_price - exit_avg_entry) * result.qty
# --- Step 6: persist trade to DB ---
if db_session_factory is not None:
try:
@ -212,6 +255,7 @@ async def process_signal(
signal_id=signal.signal_id,
strategy_id=signal.strategy_id,
status=status_map.get(result.status, TradeStatusModel.PENDING),
pnl=realized_pnl,
)
session.add(db_trade)
await session.commit()