From 52b3c764827ed87f7b0b16a34cb0d8c148242e9f Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 4 Jun 2026 22:13:30 +0000 Subject: [PATCH] feat(kevin): correct exits, realized P&L, wire exit scanner - executor: EXIT/SELL signals close the FULL held broker position (not a target_dollars-sized fresh order) and skip when flat - executor: book realized P&L on the closing trade ((fill - avg_entry)*qty) so the dashboard P&L + win-rate populate; entries leave pnl=None - exit scanner: wired into the bridge run loop on kevin_bridge_exit_scan_cron (daily ET gate; croniter intentionally not a dependency) plus an offsetting-SELL guard so it only emits exits for currently-held tickers [ci skip] Co-Authored-By: Claude Opus 4.8 --- services/kevin_signal_bridge/exit_scanner.py | 14 ++ services/kevin_signal_bridge/main.py | 77 ++++++ services/trade_executor/main.py | 58 ++++- .../kevin_signal_bridge/test_exit_scanner.py | 34 ++- .../test_exit_scanner_guard.py | 135 ++++++++++ .../services/kevin_signal_bridge/test_main.py | 47 +++- tests/services/test_trade_executor.py | 237 +++++++++++++++++- 7 files changed, 587 insertions(+), 15 deletions(-) create mode 100644 tests/services/kevin_signal_bridge/test_exit_scanner_guard.py diff --git a/services/kevin_signal_bridge/exit_scanner.py b/services/kevin_signal_bridge/exit_scanner.py index 80b5aa0..5e7b143 100644 --- a/services/kevin_signal_bridge/exit_scanner.py +++ b/services/kevin_signal_bridge/exit_scanner.py @@ -27,15 +27,26 @@ class ExitScanner: session_factory: Callable[..., Any], publisher: Any, config: Any, + broker: Any, ) -> None: self.session_factory = session_factory self.publisher = publisher self.config = config + self.broker = broker async def scan_and_emit_exits(self) -> int: """Returns the number of EXIT signals emitted.""" now = datetime.now(timezone.utc) emitted = 0 + + # Offsetting-SELL guard: only emit exits for tickers STILL held at the + # broker, so we never re-emit for an already-closed position. With zero + # open positions this set is empty → the scan is a safe no-op. + positions = await self.broker.get_positions() + held_tickers = {p.ticker for p in positions if p.qty != 0} + if not held_tickers: + return 0 + async with self.session_factory() as session: # Find open Kevin trades (FILLED, no closing trade yet on same ticker) open_trades = ( @@ -54,6 +65,9 @@ class ExitScanner: ) for trade in open_trades: + # Skip tickers no longer held at the broker (already closed). + if trade.ticker not in held_tickers: + continue # Find the source audit row to learn the original holding_days target async with self.session_factory() as session: audit = ( diff --git a/services/kevin_signal_bridge/main.py b/services/kevin_signal_bridge/main.py index ac42a28..bb3a0ae 100644 --- a/services/kevin_signal_bridge/main.py +++ b/services/kevin_signal_bridge/main.py @@ -9,8 +9,10 @@ from __future__ import annotations import asyncio import logging +from datetime import date, datetime from decimal import Decimal from typing import Any +from zoneinfo import ZoneInfo from shared.constants.kevin import KEVIN_STRATEGY_UUID from shared.schemas.kevin import KevinAccountState, KevinDecisionType @@ -18,6 +20,56 @@ from shared.schemas.trading import SignalDirection, TradeSignal logger = logging.getLogger(__name__) +_ET = ZoneInfo("America/New_York") + + +def should_run_exit_scan( + cron: str, + now_et: datetime, + last_run_date: date | None, +) -> bool: + """Decide whether the daily exit-scan should run right now. + + ``croniter`` is not a project dependency, so we parse only the fields the + Kevin schedule uses — ``minute hour * * dow`` — and apply a simple + once-per-ET-weekday gate: + + * fire only on a weekday listed in the cron's day-of-week field, + * only at/after the cron's HH:MM (ET), + * at most once per ET calendar day (tracked via ``last_run_date``). + + The hour/minute and DOW are honoured; the day-of-month / month fields are + treated as wildcards (the Kevin cron always sets them to ``*``). + """ + minute, hour, _dom, _month, dow = cron.split() + target_minutes = int(hour) * 60 + int(minute) + + # cron DOW: 0/7 = Sunday … 6 = Saturday. Python weekday(): Mon=0 … Sun=6. + allowed_dows = _parse_cron_dow(dow) + py_to_cron = {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 0} + if py_to_cron[now_et.weekday()] not in allowed_dows: + return False + + if now_et.hour * 60 + now_et.minute < target_minutes: + return False + + return last_run_date != now_et.date() + + +def _parse_cron_dow(dow: str) -> set[int]: + """Expand a cron day-of-week field (``*``, ``1-5``, ``1,3,5``) to a set of + cron DOW integers (0/7 = Sunday … 6 = Saturday).""" + if dow == "*": + return set(range(7)) + days: set[int] = set() + for part in dow.split(","): + if "-" in part: + lo, hi = (int(x) for x in part.split("-")) + days.update(range(lo, hi + 1)) + else: + days.add(int(part)) + return days + class KevinBridge: """End-to-end orchestrator. Composed from injected collaborators @@ -239,6 +291,7 @@ async def run() -> None: from services.kevin_signal_bridge.blocklist import KevinBlocklist from services.kevin_signal_bridge.config import KevinBridgeConfig from services.kevin_signal_bridge.cursor import RedisCursor + from services.kevin_signal_bridge.exit_scanner import ExitScanner from services.kevin_signal_bridge.risk_counters import KevinRiskCounters from shared.broker.alpaca_broker import AlpacaBroker from shared.db import create_db @@ -316,6 +369,18 @@ async def run() -> None: risk_counters=risk_counters, ) + # Daily exit scan — emits EXIT signals for Kevin positions whose hold has + # elapsed and that are STILL held at the broker. Shares the bridge's + # publisher + broker; gated to fire once per ET weekday (see + # should_run_exit_scan; croniter is intentionally not a dependency). + exit_scanner = ExitScanner( + session_factory=session_factory, + publisher=publisher, + config=config, + broker=broker, + ) + last_exit_scan_date: date | None = None + stop = asyncio.Event() def _on_signal(*_: Any) -> None: @@ -343,6 +408,18 @@ async def run() -> None: except Exception: logger.exception("Bridge poll iteration failed") + if should_run_exit_scan( + config.kevin_bridge_exit_scan_cron, + datetime.now(_ET), + last_exit_scan_date, + ): + try: + emitted = await exit_scanner.scan_and_emit_exits() + last_exit_scan_date = datetime.now(_ET).date() + logger.info("Exit scan emitted %d EXIT signal(s)", emitted) + except Exception: + logger.exception("Exit scan failed") + try: await asyncio.wait_for( stop.wait(), diff --git a/services/trade_executor/main.py b/services/trade_executor/main.py index 65ef5db..3490dac 100644 --- a/services/trade_executor/main.py +++ b/services/trade_executor/main.py @@ -32,6 +32,7 @@ from shared.schemas.trading import ( OrderRequest, OrderSide, OrderStatus, + PositionInfo, SignalDirection, TradeExecution, TradeSignal, @@ -56,6 +57,19 @@ async def _next_market_open(broker: AlpacaBroker) -> datetime: return next_open.astimezone(timezone.utc) +async def _held_position(broker: AlpacaBroker, ticker: str) -> PositionInfo | None: + """Return the currently-held position for *ticker*, or ``None`` if flat. + + Used to size EXIT orders off the live broker position rather than the + signal's target_dollars. + """ + positions = await broker.get_positions() + for pos in positions: + if pos.ticker == ticker and pos.qty != 0: + return pos + return None + + def _build_order_request( signal: TradeSignal, side: OrderSide, @@ -156,15 +170,32 @@ async def process_signal( return # --- Step 2: calculate position size --- - account = await broker.get_account() - qty = risk_manager.calculate_position_size(signal, account) - if qty <= 0: - logger.info("Position size is zero for %s — skipping", signal.ticker) - counters["rejections"].add(1, {"reason": "zero_position_size"}) - return + # Entries (LONG) size from target_dollars/strength via the risk manager. + # Exits (EXIT/SELL) close the FULL currently-held broker position — a + # Kevin EXIT carries target_dollars, so sizing it via the risk manager + # would open/size a fresh position instead of flattening the existing one. + side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL + exit_avg_entry: float | None = None + if signal.direction == SignalDirection.LONG: + account = await broker.get_account() + qty = risk_manager.calculate_position_size(signal, account) + if qty <= 0: + logger.info("Position size is zero for %s — skipping", signal.ticker) + counters["rejections"].add(1, {"reason": "zero_position_size"}) + return + else: + held = await _held_position(broker, signal.ticker) + if held is None: + logger.info( + "EXIT for %s but no position held — skipping (no order)", + signal.ticker, + ) + counters["rejections"].add(1, {"reason": "no_position_to_close"}) + return + qty = abs(held.qty) + exit_avg_entry = held.avg_entry # --- Step 3: create order --- - side = OrderSide.BUY if signal.direction == SignalDirection.LONG else OrderSide.SELL order_request = _build_order_request(signal, side, qty, risk_manager) # --- Step 4: submit order --- @@ -188,6 +219,18 @@ async def process_signal( timestamp=result.timestamp, ) + # --- Step 5b: realized P&L on close --- + # The closing (EXIT) trade carries the round-trip P&L; entry trades leave + # pnl=None. avg_entry is captured from the held position BEFORE the sell. + # Only book P&L on a fill — a rejected/pending sell has no realized result. + realized_pnl: float | None = None + if ( + exit_avg_entry is not None + and result.status == OrderStatus.FILLED + and result.filled_price is not None + ): + realized_pnl = (result.filled_price - exit_avg_entry) * result.qty + # --- Step 6: persist trade to DB --- if db_session_factory is not None: try: @@ -212,6 +255,7 @@ async def process_signal( signal_id=signal.signal_id, strategy_id=signal.strategy_id, status=status_map.get(result.status, TradeStatusModel.PENDING), + pnl=realized_pnl, ) session.add(db_trade) await session.commit() diff --git a/tests/services/kevin_signal_bridge/test_exit_scanner.py b/tests/services/kevin_signal_bridge/test_exit_scanner.py index a2fbf26..545f00a 100644 --- a/tests/services/kevin_signal_bridge/test_exit_scanner.py +++ b/tests/services/kevin_signal_bridge/test_exit_scanner.py @@ -7,8 +7,6 @@ from unittest.mock import AsyncMock, MagicMock import pytest from sqlalchemy.ext.asyncio import AsyncSession -pytestmark = pytest.mark.integration - from services.kevin_signal_bridge.exit_scanner import ExitScanner from shared.constants.kevin import KEVIN_STRATEGY_UUID from shared.models.meet_kevin import ( @@ -23,6 +21,9 @@ from shared.models.meet_kevin import ( ) from shared.models.meet_kevin_trading import BridgeStatus, KevinSignalBridgeState from shared.models.trading import Trade, TradeSide, TradeStatus +from shared.schemas.trading import PositionInfo + +pytestmark = pytest.mark.integration def _factory(session: AsyncSession): @@ -39,6 +40,25 @@ def _factory(session: AsyncSession): return factory +def _broker_holding(*tickers: str): + """Mock broker reporting the given tickers as currently held.""" + broker = AsyncMock() + broker.get_positions = AsyncMock( + return_value=[ + PositionInfo( + ticker=t, + qty=10.0, + avg_entry=100.0, + current_price=110.0, + unrealized_pnl=100.0, + market_value=1100.0, + ) + for t in tickers + ] + ) + return broker + + async def _seed_video_and_mention(session: AsyncSession) -> int: channel = KevinChannel(youtube_channel_id="UCex", title="t") session.add(channel) @@ -111,7 +131,10 @@ async def test_exit_scanner_emits_on_elapsed_hold(db_session: AsyncSession): config = MagicMock(kevin_hold_days={"unspecified": 10}) scanner = ExitScanner( - session_factory=_factory(db_session), publisher=publisher, config=config + session_factory=_factory(db_session), + publisher=publisher, + config=config, + broker=_broker_holding("NVDA"), ) emitted = await scanner.scan_and_emit_exits() assert emitted == 1 @@ -149,7 +172,10 @@ async def test_exit_scanner_does_not_emit_when_hold_not_elapsed( config = MagicMock(kevin_hold_days={"unspecified": 10}) scanner = ExitScanner( - session_factory=_factory(db_session), publisher=publisher, config=config + session_factory=_factory(db_session), + publisher=publisher, + config=config, + broker=_broker_holding("NVDA"), ) emitted = await scanner.scan_and_emit_exits() assert emitted == 0 diff --git a/tests/services/kevin_signal_bridge/test_exit_scanner_guard.py b/tests/services/kevin_signal_bridge/test_exit_scanner_guard.py new file mode 100644 index 0000000..9349b82 --- /dev/null +++ b/tests/services/kevin_signal_bridge/test_exit_scanner_guard.py @@ -0,0 +1,135 @@ +"""Unit tests for ExitScanner's offsetting-SELL (held-at-broker) guard and +the no-op-with-zero-positions behaviour. + +These are pure unit tests (no DB fixture) — the session factory and broker +are mocked so they run in the default ``not integration`` suite. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from services.kevin_signal_bridge.exit_scanner import ExitScanner +from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.schemas.trading import PositionInfo + + +def _trade(ticker: str) -> SimpleNamespace: + return SimpleNamespace( + id=uuid.uuid4(), + ticker=ticker, + strategy_id=KEVIN_STRATEGY_UUID, + ) + + +def _audit(days_ago: int, hold_days: int = 10) -> SimpleNamespace: + return SimpleNamespace( + decided_at=datetime.now(timezone.utc) - timedelta(days=days_ago), + notes=f"BUY hold={hold_days}d", + ) + + +def _session_factory(trades: list, audit: SimpleNamespace | None): + """Fake session factory whose ``execute`` returns *trades* on the first + call (the open-trades query) and *audit* on every subsequent per-trade + audit lookup.""" + + class _Result: + def __init__(self, payload): + self._payload = payload + + def scalars(self): + return self + + def all(self): + return self._payload + + def one_or_none(self): + return self._payload + + class _Session: + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + async def execute(self, stmt): + # Route by which table the SELECT targets — the open-trades query + # hits `trades`, the per-trade audit lookup hits the bridge-state + # table. Robust against separate `async with` blocks. + if "trades" in str(stmt).lower(): + return _Result(trades) + return _Result(audit) + + def factory(): + return _Session() + + return factory + + +def _position(ticker: str, qty: float = 10.0) -> PositionInfo: + return PositionInfo( + ticker=ticker, + qty=qty, + avg_entry=100.0, + current_price=110.0, + unrealized_pnl=100.0, + market_value=qty * 110.0, + ) + + +def _broker(positions: list[PositionInfo]): + broker = AsyncMock() + broker.get_positions = AsyncMock(return_value=positions) + return broker + + +@pytest.mark.asyncio +async def test_emits_exit_for_held_ticker_past_hold(): + """Hold elapsed AND still held at broker → emit one EXIT.""" + scanner = ExitScanner( + session_factory=_session_factory([_trade("NVDA")], _audit(days_ago=30)), + publisher=AsyncMock(), + config=MagicMock(kevin_hold_days={"unspecified": 10}), + broker=_broker([_position("NVDA")]), + ) + emitted = await scanner.scan_and_emit_exits() + assert emitted == 1 + scanner.publisher.publish.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_no_emit_when_ticker_no_longer_held(): + """Offsetting guard: hold elapsed but the position is already closed + (broker holds nothing for that ticker) → emit nothing.""" + scanner = ExitScanner( + session_factory=_session_factory([_trade("NVDA")], _audit(days_ago=30)), + publisher=AsyncMock(), + config=MagicMock(kevin_hold_days={"unspecified": 10}), + broker=_broker([]), # nothing held + ) + emitted = await scanner.scan_and_emit_exits() + assert emitted == 0 + scanner.publisher.publish.assert_not_called() + + +@pytest.mark.asyncio +async def test_noop_with_zero_open_positions(): + """Zero positions at the broker → safe no-op regardless of open trades.""" + scanner = ExitScanner( + session_factory=_session_factory( + [_trade("NVDA"), _trade("AAPL")], _audit(days_ago=30) + ), + publisher=AsyncMock(), + config=MagicMock(kevin_hold_days={"unspecified": 10}), + broker=_broker([]), + ) + emitted = await scanner.scan_and_emit_exits() + assert emitted == 0 + scanner.publisher.publish.assert_not_called() diff --git a/tests/services/kevin_signal_bridge/test_main.py b/tests/services/kevin_signal_bridge/test_main.py index bc7c9c8..b21a07b 100644 --- a/tests/services/kevin_signal_bridge/test_main.py +++ b/tests/services/kevin_signal_bridge/test_main.py @@ -1,9 +1,13 @@ """Smoke-level orchestrator tests for the Kevin signal bridge.""" +from datetime import date, datetime from decimal import Decimal from unittest.mock import AsyncMock, MagicMock +from zoneinfo import ZoneInfo -from services.kevin_signal_bridge.main import KevinBridge +from services.kevin_signal_bridge.main import KevinBridge, should_run_exit_scan + +_ET = ZoneInfo("America/New_York") async def test_bridge_dry_run_writes_audit_does_not_publish(): @@ -232,3 +236,44 @@ async def test_bridge_advances_cursor_only_after_publish(): # cursor.advance must NOT have been called for mention 1 cursor.advance.assert_not_called() + + +# --------------------------------------------------------------------------- +# Exit-scan cron gate (croniter unavailable → simple daily ET-weekday gate) +# --------------------------------------------------------------------------- + +_CRON = "35 9 * * 1-5" # 09:35 ET, Mon-Fri + + +def test_exit_scan_gate_fires_at_or_after_time_on_weekday(): + """Wed 09:36 ET, not yet run today → fire.""" + now = datetime(2026, 6, 3, 9, 36, tzinfo=_ET) # Wednesday + assert should_run_exit_scan(_CRON, now, last_run_date=None) is True + + +def test_exit_scan_gate_does_not_fire_before_time(): + """Wed 09:34 ET (one minute early) → do not fire.""" + now = datetime(2026, 6, 3, 9, 34, tzinfo=_ET) + assert should_run_exit_scan(_CRON, now, last_run_date=None) is False + + +def test_exit_scan_gate_does_not_fire_on_weekend(): + """Saturday 10:00 ET → do not fire (cron DOW is 1-5).""" + now = datetime(2026, 6, 6, 10, 0, tzinfo=_ET) # Saturday + assert should_run_exit_scan(_CRON, now, last_run_date=None) is False + + +def test_exit_scan_gate_runs_once_per_day(): + """Already ran today → do not fire again the same ET day.""" + now = datetime(2026, 6, 3, 11, 0, tzinfo=_ET) # Wednesday, well past 09:35 + assert ( + should_run_exit_scan(_CRON, now, last_run_date=date(2026, 6, 3)) is False + ) + + +def test_exit_scan_gate_fires_next_day_after_prior_run(): + """Ran yesterday → fire again today once past the cron time.""" + now = datetime(2026, 6, 4, 9, 40, tzinfo=_ET) # Thursday + assert ( + should_run_exit_scan(_CRON, now, last_run_date=date(2026, 6, 3)) is True + ) diff --git a/tests/services/test_trade_executor.py b/tests/services/test_trade_executor.py index fc44ed5..efe3870 100644 --- a/tests/services/test_trade_executor.py +++ b/tests/services/test_trade_executor.py @@ -9,6 +9,7 @@ from __future__ import annotations from datetime import datetime, timedelta, timezone from decimal import Decimal from unittest.mock import AsyncMock, MagicMock, patch +from uuid import UUID from zoneinfo import ZoneInfo import pytest @@ -17,6 +18,7 @@ from services.trade_executor.config import TradeExecutorConfig from services.trade_executor.main import process_signal from services.trade_executor.risk_manager import RiskManager from shared.constants.kevin import KEVIN_STRATEGY_UUID +from shared.models.trading import TradeSide as TradeSideModel from shared.schemas.trading import ( AccountInfo, OrderResult, @@ -557,7 +559,9 @@ class TestExecutorBracketOrders: """The bridge stamps stop/take pcts even on EXIT signals; the direction guard must keep the resulting SELL order SIMPLE.""" config = _make_config() - broker = _mock_broker(positions=[], account=_make_account(100_000)) + # EXIT now requires a held position to size from. + held = _make_position(ticker="NVDA", market_value=2000.0) + broker = _mock_broker(positions=[held], account=_make_account(100_000)) publisher = AsyncMock() publisher.publish = AsyncMock(return_value=b"1-0") counters = { @@ -586,6 +590,235 @@ class TestExecutorBracketOrders: assert order_arg.stop_loss_price is None +# --------------------------------------------------------------------------- +# Executor flow — EXIT sizing from the held broker position +# --------------------------------------------------------------------------- + + +def _held_position(ticker: str, qty: float, avg_entry: float) -> PositionInfo: + return PositionInfo( + ticker=ticker, + qty=qty, + avg_entry=avg_entry, + current_price=avg_entry, + unrealized_pnl=0.0, + market_value=qty * avg_entry, + ) + + +def _exit_filled_broker( + positions: list[PositionInfo], fill_price: float, fill_qty: float +): + """Broker whose submit_order returns a FILLED SELL at fill_price/fill_qty.""" + broker = AsyncMock() + broker.get_positions = AsyncMock(return_value=positions) + broker.get_account = AsyncMock(return_value=_make_account(100_000)) + broker.submit_order = AsyncMock( + return_value=OrderResult( + order_id="ord-exit", + ticker=positions[0].ticker if positions else "NVDA", + side=OrderSide.SELL, + qty=fill_qty, + filled_price=fill_price, + status=OrderStatus.FILLED, + timestamp=datetime.now(timezone.utc), + ) + ) + return broker + + +class TestExecutorExitSizing: + """EXIT signals must be sized from the currently-held broker position, + NOT from the signal's target_dollars (which would open/size a fresh + position).""" + + @pytest.mark.asyncio + async def test_exit_sells_full_held_qty(self): + """A Kevin EXIT carrying target_dollars=$2000 (=20 sh @ $100) on a + position of 37 held shares must SELL 37 — the full held qty.""" + config = _make_config() + held = _held_position("NVDA", qty=37.0, avg_entry=90.0) + broker = _exit_filled_broker([held], fill_price=110.0, fill_qty=37.0) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal( + ticker="NVDA", + direction=SignalDirection.EXIT, + current_price=Decimal("100"), + target_dollars=Decimal("2000"), + ) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, RiskManager(config, broker), broker, publisher, counters + ) + + broker.submit_order.assert_called_once() + order_arg = broker.submit_order.call_args[0][0] + assert order_arg.side == OrderSide.SELL + assert order_arg.qty == 37.0 # held qty, NOT 20 (=target_dollars/price) + assert order_arg.order_class == "simple" + + @pytest.mark.asyncio + async def test_exit_with_no_held_position_submits_nothing(self): + """EXIT for a ticker with no held position → no order, skip logged, + rejection counted (never a zero/garbage sell).""" + config = _make_config() + # Holds a DIFFERENT ticker — nothing for NVDA. + broker = _exit_filled_broker( + [_held_position("AAPL", qty=10.0, avg_entry=150.0)], + fill_price=110.0, + fill_qty=0.0, + ) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal( + ticker="NVDA", + direction=SignalDirection.EXIT, + current_price=Decimal("100"), + target_dollars=Decimal("2000"), + ) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, RiskManager(config, broker), broker, publisher, counters + ) + + broker.submit_order.assert_not_called() + publisher.publish.assert_not_called() + counters["rejections"].add.assert_called_once() + + @pytest.mark.asyncio + async def test_entry_sizing_path_unchanged(self): + """LONG entries keep the risk_manager.calculate_position_size path — + target_dollars=$2000 @ $100 → 20 shares (not driven by held qty).""" + config = _make_config() + broker = _mock_broker(positions=[], account=_make_account(100_000)) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal( + ticker="NVDA", + direction=SignalDirection.LONG, + current_price=Decimal("100"), + target_dollars=Decimal("2000"), + ) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, RiskManager(config, broker), broker, publisher, counters + ) + + broker.submit_order.assert_called_once() + order_arg = broker.submit_order.call_args[0][0] + assert order_arg.side == OrderSide.BUY + assert order_arg.qty == 20.0 # target_dollars / current_price + + +# --------------------------------------------------------------------------- +# Executor flow — realized P&L on close +# --------------------------------------------------------------------------- + + +class TestExecutorRealizedPnl: + """When an EXIT fill closes a long, the persisted Trade row carries the + round-trip realized P&L; ENTRY trades leave pnl=None.""" + + @pytest.mark.asyncio + async def test_exit_writes_realized_pnl(self): + """SELL 10 @ 110 against avg_entry 90 → pnl = (110-90)*10 = 200.""" + config = _make_config() + held = _held_position("NVDA", qty=10.0, avg_entry=90.0) + broker = _exit_filled_broker([held], fill_price=110.0, fill_qty=10.0) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal( + ticker="NVDA", + direction=SignalDirection.EXIT, + current_price=Decimal("100"), + target_dollars=Decimal("2000"), + ) + mock_session = AsyncMock() + mock_session.add = MagicMock() + mock_session.commit = AsyncMock() + db_factory = _make_mock_db_session_factory(mock_session) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, + RiskManager(config, broker), + broker, + publisher, + counters, + db_factory, + ) + + trade_obj = mock_session.add.call_args[0][0] + assert trade_obj.side == TradeSideModel.SELL + assert trade_obj.pnl == 200.0 + + @pytest.mark.asyncio + async def test_entry_trade_has_null_pnl(self): + """An ENTRY (LONG) trade is persisted with pnl=None.""" + config = _make_config() + broker = _mock_broker(positions=[], account=_make_account(100_000)) + publisher = AsyncMock() + publisher.publish = AsyncMock(return_value=b"1-0") + counters = { + "trades_executed": MagicMock(), + "rejections": MagicMock(), + "fill_latency": MagicMock(), + } + + signal = _make_kevin_signal( + ticker="NVDA", + direction=SignalDirection.LONG, + current_price=Decimal("100"), + target_dollars=Decimal("2000"), + ) + mock_session = AsyncMock() + mock_session.add = MagicMock() + mock_session.commit = AsyncMock() + db_factory = _make_mock_db_session_factory(mock_session) + + with patch.object(RiskManager, "check_risk", return_value=(True, "approved")): + await process_signal( + signal, + RiskManager(config, broker), + broker, + publisher, + counters, + db_factory, + ) + + trade_obj = mock_session.add.call_args[0][0] + assert trade_obj.side == TradeSideModel.BUY + assert trade_obj.pnl is None + + # --------------------------------------------------------------------------- # Executor flow — rejected signal # --------------------------------------------------------------------------- @@ -734,8 +967,6 @@ class TestExecutorDBPersistence: """signal_id from TradeSignal should appear in the published TradeExecution.""" signal = _make_signal(ticker="AAPL", strength=0.8, current_price=150.0) assert signal.signal_id is not None - # Verify signal_id is a UUID - from uuid import UUID assert isinstance(signal.signal_id, UUID)