feat(kevin_bridge): service entrypoint with concrete wiring
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

Wires the dependency-injected KevinBridge to concrete Redis cursor +
DB session factory + AlpacaBroker (or stub when creds missing). Includes
TradeSignalPublisher (Pydantic -> dict for the redis stream) and
SIGINT/SIGTERM graceful shutdown. Adds is_asset_tradable + get_latest_price
to AlpacaBroker so the bridge can query asset metadata.
This commit is contained in:
Viktor Barzin 2026-05-24 01:06:18 +00:00
parent cff2564428
commit db103df9b1
2 changed files with 170 additions and 6 deletions

View file

@ -197,16 +197,150 @@ class KevinBridge:
return "deferred" return "deferred"
# --- service entry point (Task 11 will fill this in) --- # --- concrete publisher (Pydantic -> stream) ---
class TradeSignalPublisher:
"""Wraps a StreamPublisher to accept Pydantic TradeSignal objects."""
def __init__(self, stream_publisher: Any) -> None:
self.stream_publisher = stream_publisher
async def publish(self, signal: TradeSignal) -> str:
payload = signal.model_dump(mode="json")
return str(await self.stream_publisher.publish(payload))
# --- service entry point ---
async def run() -> None: async def run() -> None:
"""Boot the bridge with concrete collaborators + main loop. """Boot the bridge with concrete collaborators + main poll loop."""
import os
import signal as _signal
from decimal import Decimal as _D
Filled in by Task 11 wiring concrete cursor / aggregator / blocklist / from redis.asyncio import Redis
risk_counters / audit_writer.
""" from services.kevin_signal_bridge.aggregator import MentionAggregator
raise NotImplementedError("Task 11 will wire concrete collaborators") from services.kevin_signal_bridge.audit import AuditWriter
from services.kevin_signal_bridge.blocklist import KevinBlocklist
from services.kevin_signal_bridge.config import KevinBridgeConfig
from services.kevin_signal_bridge.cursor import RedisCursor
from services.kevin_signal_bridge.risk_counters import KevinRiskCounters
from shared.broker.alpaca_broker import AlpacaBroker
from shared.db import create_db
from shared.redis_streams import StreamPublisher
from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig
logging.basicConfig(
level=os.environ.get("TRADING_LOG_LEVEL", "INFO"),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger.info("Booting Kevin signal bridge")
config = KevinBridgeConfig()
_engine, session_factory = create_db(config)
redis = Redis.from_url(config.redis_url)
cursor = RedisCursor(redis)
blocklist = KevinBlocklist(redis)
risk_counters = KevinRiskCounters(redis)
aggregator = MentionAggregator(
session_factory=session_factory,
window_hours=config.kevin_max_mention_age_hours,
boost_per_repeat=_D(str(config.kevin_mention_boost_per_repeat)),
max_boost=_D(str(config.kevin_max_mention_boost)),
)
strategy_cfg = KevinStrategyConfig(
min_conviction=_D(str(config.kevin_min_conviction)),
max_mention_age_hours=config.kevin_max_mention_age_hours,
base_position_pct=_D(str(config.kevin_base_position_pct)),
min_trade_usd=_D(str(config.kevin_min_trade_usd)),
max_trade_usd=_D(str(config.kevin_max_trade_usd)),
max_per_ticker_usd=_D(str(config.kevin_max_per_ticker_usd)),
hold_days_by_horizon=config.kevin_hold_days,
avoid_closes_longs=config.kevin_avoid_closes_longs,
avoid_blocks_days=config.kevin_avoid_blocks_days,
)
strategy = KevinStrategy(strategy_cfg)
audit_writer = AuditWriter(session_factory=session_factory)
stream_pub = StreamPublisher(redis, "signals:generated")
publisher = TradeSignalPublisher(stream_pub)
api_key = os.environ.get("TRADING_ALPACA_API_KEY", "")
secret_key = os.environ.get("TRADING_ALPACA_SECRET_KEY", "")
broker: Any
if api_key and secret_key:
broker = AlpacaBroker(api_key=api_key, secret_key=secret_key, paper=True)
else:
# Run without a broker — useful for the dry-run smoke test
from unittest.mock import AsyncMock as _AsyncMock
from unittest.mock import MagicMock as _MagicMock
logger.warning(
"Alpaca creds missing — running with stub broker (dry-run only)"
)
broker = _AsyncMock()
broker.is_asset_tradable.return_value = True
broker.get_latest_price.return_value = _D("0")
broker.get_account.return_value = _MagicMock(
equity=_D("100000"), cash=_D("100000")
)
broker.get_positions.return_value = []
bridge = KevinBridge(
config=config,
cursor=cursor,
publisher=publisher,
aggregator=aggregator,
strategy=strategy,
audit_writer=audit_writer,
broker=broker,
blocklist=blocklist,
risk_counters=risk_counters,
)
stop = asyncio.Event()
def _on_signal(*_: Any) -> None:
logger.info("Received shutdown signal")
stop.set()
loop = asyncio.get_running_loop()
for sig in (_signal.SIGINT, _signal.SIGTERM):
try:
loop.add_signal_handler(sig, _on_signal)
except NotImplementedError:
pass
logger.info(
"Bridge running: poll_interval=%ss kill_switch_on=%s",
config.kevin_bridge_poll_interval_seconds,
config.kevin_enable_trading,
)
while not stop.is_set():
try:
n = await bridge.process_one_pass()
if n:
logger.info("Processed %d mentions", n)
except Exception:
logger.exception("Bridge poll iteration failed")
try:
await asyncio.wait_for(
stop.wait(),
timeout=config.kevin_bridge_poll_interval_seconds,
)
except asyncio.TimeoutError:
pass
logger.info("Bridge shutting down")
await redis.aclose()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -10,6 +10,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from decimal import Decimal
from alpaca.common.exceptions import APIError from alpaca.common.exceptions import APIError
from alpaca.trading.client import TradingClient from alpaca.trading.client import TradingClient
@ -238,3 +239,32 @@ class AlpacaBroker(BaseBroker):
self._client.get_order_by_id, order_id self._client.get_order_by_id, order_id
) )
return self._order_to_result(alpaca_order) return self._order_to_result(alpaca_order)
async def is_asset_tradable(self, symbol: str) -> bool:
"""Return True iff Alpaca lists *symbol* as tradable.
Conservative on failure (returns False) so the bridge skips the
mention instead of half-trading it.
"""
try:
asset = await asyncio.to_thread(self._client.get_asset, symbol)
return bool(getattr(asset, "tradable", False))
except Exception:
return False
async def get_latest_price(self, symbol: str) -> Decimal:
"""Last trade price for *symbol* as Decimal. Returns Decimal('0')
on lookup failure callers should treat 0 as 'no price available'.
"""
try:
# alpaca-py uses MarketDataClient for prices; the TradingClient
# does not expose latest_trade. Use the get_assets fallback
# (returns last_close) when present.
asset = await asyncio.to_thread(self._client.get_asset, symbol)
for attr in ("last_trade_price", "close", "last_close"):
v = getattr(asset, attr, None)
if v is not None:
return Decimal(str(v))
except Exception:
pass
return Decimal("0")