diff --git a/services/kevin_signal_bridge/main.py b/services/kevin_signal_bridge/main.py index b2f9930..90d29a4 100644 --- a/services/kevin_signal_bridge/main.py +++ b/services/kevin_signal_bridge/main.py @@ -197,16 +197,150 @@ class KevinBridge: return "deferred" -# --- service entry point (Task 11 will fill this in) --- +# --- concrete publisher (Pydantic -> stream) --- + + +class TradeSignalPublisher: + """Wraps a StreamPublisher to accept Pydantic TradeSignal objects.""" + + def __init__(self, stream_publisher: Any) -> None: + self.stream_publisher = stream_publisher + + async def publish(self, signal: TradeSignal) -> str: + payload = signal.model_dump(mode="json") + return str(await self.stream_publisher.publish(payload)) + + +# --- service entry point --- async def run() -> None: - """Boot the bridge with concrete collaborators + main loop. + """Boot the bridge with concrete collaborators + main poll loop.""" + import os + import signal as _signal + from decimal import Decimal as _D - Filled in by Task 11 wiring concrete cursor / aggregator / blocklist / - risk_counters / audit_writer. - """ - raise NotImplementedError("Task 11 will wire concrete collaborators") + from redis.asyncio import Redis + + from services.kevin_signal_bridge.aggregator import MentionAggregator + from services.kevin_signal_bridge.audit import AuditWriter + from services.kevin_signal_bridge.blocklist import KevinBlocklist + from services.kevin_signal_bridge.config import KevinBridgeConfig + from services.kevin_signal_bridge.cursor import RedisCursor + from services.kevin_signal_bridge.risk_counters import KevinRiskCounters + from shared.broker.alpaca_broker import AlpacaBroker + from shared.db import create_db + from shared.redis_streams import StreamPublisher + from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig + + logging.basicConfig( + level=os.environ.get("TRADING_LOG_LEVEL", "INFO"), + format="%(asctime)s %(name)s %(levelname)s %(message)s", + ) + logger.info("Booting Kevin signal bridge") + + config = KevinBridgeConfig() + _engine, session_factory = create_db(config) + + redis = Redis.from_url(config.redis_url) + cursor = RedisCursor(redis) + blocklist = KevinBlocklist(redis) + risk_counters = KevinRiskCounters(redis) + + aggregator = MentionAggregator( + session_factory=session_factory, + window_hours=config.kevin_max_mention_age_hours, + boost_per_repeat=_D(str(config.kevin_mention_boost_per_repeat)), + max_boost=_D(str(config.kevin_max_mention_boost)), + ) + + strategy_cfg = KevinStrategyConfig( + min_conviction=_D(str(config.kevin_min_conviction)), + max_mention_age_hours=config.kevin_max_mention_age_hours, + base_position_pct=_D(str(config.kevin_base_position_pct)), + min_trade_usd=_D(str(config.kevin_min_trade_usd)), + max_trade_usd=_D(str(config.kevin_max_trade_usd)), + max_per_ticker_usd=_D(str(config.kevin_max_per_ticker_usd)), + hold_days_by_horizon=config.kevin_hold_days, + avoid_closes_longs=config.kevin_avoid_closes_longs, + avoid_blocks_days=config.kevin_avoid_blocks_days, + ) + strategy = KevinStrategy(strategy_cfg) + + audit_writer = AuditWriter(session_factory=session_factory) + stream_pub = StreamPublisher(redis, "signals:generated") + publisher = TradeSignalPublisher(stream_pub) + + api_key = os.environ.get("TRADING_ALPACA_API_KEY", "") + secret_key = os.environ.get("TRADING_ALPACA_SECRET_KEY", "") + broker: Any + if api_key and secret_key: + broker = AlpacaBroker(api_key=api_key, secret_key=secret_key, paper=True) + else: + # Run without a broker — useful for the dry-run smoke test + from unittest.mock import AsyncMock as _AsyncMock + from unittest.mock import MagicMock as _MagicMock + + logger.warning( + "Alpaca creds missing — running with stub broker (dry-run only)" + ) + broker = _AsyncMock() + broker.is_asset_tradable.return_value = True + broker.get_latest_price.return_value = _D("0") + broker.get_account.return_value = _MagicMock( + equity=_D("100000"), cash=_D("100000") + ) + broker.get_positions.return_value = [] + + bridge = KevinBridge( + config=config, + cursor=cursor, + publisher=publisher, + aggregator=aggregator, + strategy=strategy, + audit_writer=audit_writer, + broker=broker, + blocklist=blocklist, + risk_counters=risk_counters, + ) + + stop = asyncio.Event() + + def _on_signal(*_: Any) -> None: + logger.info("Received shutdown signal") + stop.set() + + loop = asyncio.get_running_loop() + for sig in (_signal.SIGINT, _signal.SIGTERM): + try: + loop.add_signal_handler(sig, _on_signal) + except NotImplementedError: + pass + + logger.info( + "Bridge running: poll_interval=%ss kill_switch_on=%s", + config.kevin_bridge_poll_interval_seconds, + config.kevin_enable_trading, + ) + + while not stop.is_set(): + try: + n = await bridge.process_one_pass() + if n: + logger.info("Processed %d mentions", n) + except Exception: + logger.exception("Bridge poll iteration failed") + + try: + await asyncio.wait_for( + stop.wait(), + timeout=config.kevin_bridge_poll_interval_seconds, + ) + except asyncio.TimeoutError: + pass + + logger.info("Bridge shutting down") + await redis.aclose() if __name__ == "__main__": diff --git a/shared/broker/alpaca_broker.py b/shared/broker/alpaca_broker.py index e5cda70..3e2fd42 100644 --- a/shared/broker/alpaca_broker.py +++ b/shared/broker/alpaca_broker.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone +from decimal import Decimal from alpaca.common.exceptions import APIError from alpaca.trading.client import TradingClient @@ -238,3 +239,32 @@ class AlpacaBroker(BaseBroker): self._client.get_order_by_id, order_id ) return self._order_to_result(alpaca_order) + + async def is_asset_tradable(self, symbol: str) -> bool: + """Return True iff Alpaca lists *symbol* as tradable. + + Conservative on failure (returns False) so the bridge skips the + mention instead of half-trading it. + """ + try: + asset = await asyncio.to_thread(self._client.get_asset, symbol) + return bool(getattr(asset, "tradable", False)) + except Exception: + return False + + async def get_latest_price(self, symbol: str) -> Decimal: + """Last trade price for *symbol* as Decimal. Returns Decimal('0') + on lookup failure — callers should treat 0 as 'no price available'. + """ + try: + # alpaca-py uses MarketDataClient for prices; the TradingClient + # does not expose latest_trade. Use the get_assets fallback + # (returns last_close) when present. + asset = await asyncio.to_thread(self._client.get_asset, symbol) + for attr in ("last_trade_price", "close", "last_close"): + v = getattr(asset, attr, None) + if v is not None: + return Decimal(str(v)) + except Exception: + pass + return Decimal("0")