feat(kevin_bridge): service entrypoint with concrete wiring
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Wires the dependency-injected KevinBridge to concrete Redis cursor + DB session factory + AlpacaBroker (or stub when creds missing). Includes TradeSignalPublisher (Pydantic -> dict for the redis stream) and SIGINT/SIGTERM graceful shutdown. Adds is_asset_tradable + get_latest_price to AlpacaBroker so the bridge can query asset metadata.
This commit is contained in:
parent
cff2564428
commit
db103df9b1
2 changed files with 170 additions and 6 deletions
|
|
@ -197,16 +197,150 @@ class KevinBridge:
|
||||||
return "deferred"
|
return "deferred"
|
||||||
|
|
||||||
|
|
||||||
# --- service entry point (Task 11 will fill this in) ---
|
# --- concrete publisher (Pydantic -> stream) ---
|
||||||
|
|
||||||
|
|
||||||
|
class TradeSignalPublisher:
|
||||||
|
"""Wraps a StreamPublisher to accept Pydantic TradeSignal objects."""
|
||||||
|
|
||||||
|
def __init__(self, stream_publisher: Any) -> None:
|
||||||
|
self.stream_publisher = stream_publisher
|
||||||
|
|
||||||
|
async def publish(self, signal: TradeSignal) -> str:
|
||||||
|
payload = signal.model_dump(mode="json")
|
||||||
|
return str(await self.stream_publisher.publish(payload))
|
||||||
|
|
||||||
|
|
||||||
|
# --- service entry point ---
|
||||||
|
|
||||||
|
|
||||||
async def run() -> None:
|
async def run() -> None:
|
||||||
"""Boot the bridge with concrete collaborators + main loop.
|
"""Boot the bridge with concrete collaborators + main poll loop."""
|
||||||
|
import os
|
||||||
|
import signal as _signal
|
||||||
|
from decimal import Decimal as _D
|
||||||
|
|
||||||
Filled in by Task 11 wiring concrete cursor / aggregator / blocklist /
|
from redis.asyncio import Redis
|
||||||
risk_counters / audit_writer.
|
|
||||||
"""
|
from services.kevin_signal_bridge.aggregator import MentionAggregator
|
||||||
raise NotImplementedError("Task 11 will wire concrete collaborators")
|
from services.kevin_signal_bridge.audit import AuditWriter
|
||||||
|
from services.kevin_signal_bridge.blocklist import KevinBlocklist
|
||||||
|
from services.kevin_signal_bridge.config import KevinBridgeConfig
|
||||||
|
from services.kevin_signal_bridge.cursor import RedisCursor
|
||||||
|
from services.kevin_signal_bridge.risk_counters import KevinRiskCounters
|
||||||
|
from shared.broker.alpaca_broker import AlpacaBroker
|
||||||
|
from shared.db import create_db
|
||||||
|
from shared.redis_streams import StreamPublisher
|
||||||
|
from shared.strategies.kevin import KevinStrategy, KevinStrategyConfig
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=os.environ.get("TRADING_LOG_LEVEL", "INFO"),
|
||||||
|
format="%(asctime)s %(name)s %(levelname)s %(message)s",
|
||||||
|
)
|
||||||
|
logger.info("Booting Kevin signal bridge")
|
||||||
|
|
||||||
|
config = KevinBridgeConfig()
|
||||||
|
_engine, session_factory = create_db(config)
|
||||||
|
|
||||||
|
redis = Redis.from_url(config.redis_url)
|
||||||
|
cursor = RedisCursor(redis)
|
||||||
|
blocklist = KevinBlocklist(redis)
|
||||||
|
risk_counters = KevinRiskCounters(redis)
|
||||||
|
|
||||||
|
aggregator = MentionAggregator(
|
||||||
|
session_factory=session_factory,
|
||||||
|
window_hours=config.kevin_max_mention_age_hours,
|
||||||
|
boost_per_repeat=_D(str(config.kevin_mention_boost_per_repeat)),
|
||||||
|
max_boost=_D(str(config.kevin_max_mention_boost)),
|
||||||
|
)
|
||||||
|
|
||||||
|
strategy_cfg = KevinStrategyConfig(
|
||||||
|
min_conviction=_D(str(config.kevin_min_conviction)),
|
||||||
|
max_mention_age_hours=config.kevin_max_mention_age_hours,
|
||||||
|
base_position_pct=_D(str(config.kevin_base_position_pct)),
|
||||||
|
min_trade_usd=_D(str(config.kevin_min_trade_usd)),
|
||||||
|
max_trade_usd=_D(str(config.kevin_max_trade_usd)),
|
||||||
|
max_per_ticker_usd=_D(str(config.kevin_max_per_ticker_usd)),
|
||||||
|
hold_days_by_horizon=config.kevin_hold_days,
|
||||||
|
avoid_closes_longs=config.kevin_avoid_closes_longs,
|
||||||
|
avoid_blocks_days=config.kevin_avoid_blocks_days,
|
||||||
|
)
|
||||||
|
strategy = KevinStrategy(strategy_cfg)
|
||||||
|
|
||||||
|
audit_writer = AuditWriter(session_factory=session_factory)
|
||||||
|
stream_pub = StreamPublisher(redis, "signals:generated")
|
||||||
|
publisher = TradeSignalPublisher(stream_pub)
|
||||||
|
|
||||||
|
api_key = os.environ.get("TRADING_ALPACA_API_KEY", "")
|
||||||
|
secret_key = os.environ.get("TRADING_ALPACA_SECRET_KEY", "")
|
||||||
|
broker: Any
|
||||||
|
if api_key and secret_key:
|
||||||
|
broker = AlpacaBroker(api_key=api_key, secret_key=secret_key, paper=True)
|
||||||
|
else:
|
||||||
|
# Run without a broker — useful for the dry-run smoke test
|
||||||
|
from unittest.mock import AsyncMock as _AsyncMock
|
||||||
|
from unittest.mock import MagicMock as _MagicMock
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"Alpaca creds missing — running with stub broker (dry-run only)"
|
||||||
|
)
|
||||||
|
broker = _AsyncMock()
|
||||||
|
broker.is_asset_tradable.return_value = True
|
||||||
|
broker.get_latest_price.return_value = _D("0")
|
||||||
|
broker.get_account.return_value = _MagicMock(
|
||||||
|
equity=_D("100000"), cash=_D("100000")
|
||||||
|
)
|
||||||
|
broker.get_positions.return_value = []
|
||||||
|
|
||||||
|
bridge = KevinBridge(
|
||||||
|
config=config,
|
||||||
|
cursor=cursor,
|
||||||
|
publisher=publisher,
|
||||||
|
aggregator=aggregator,
|
||||||
|
strategy=strategy,
|
||||||
|
audit_writer=audit_writer,
|
||||||
|
broker=broker,
|
||||||
|
blocklist=blocklist,
|
||||||
|
risk_counters=risk_counters,
|
||||||
|
)
|
||||||
|
|
||||||
|
stop = asyncio.Event()
|
||||||
|
|
||||||
|
def _on_signal(*_: Any) -> None:
|
||||||
|
logger.info("Received shutdown signal")
|
||||||
|
stop.set()
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
for sig in (_signal.SIGINT, _signal.SIGTERM):
|
||||||
|
try:
|
||||||
|
loop.add_signal_handler(sig, _on_signal)
|
||||||
|
except NotImplementedError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Bridge running: poll_interval=%ss kill_switch_on=%s",
|
||||||
|
config.kevin_bridge_poll_interval_seconds,
|
||||||
|
config.kevin_enable_trading,
|
||||||
|
)
|
||||||
|
|
||||||
|
while not stop.is_set():
|
||||||
|
try:
|
||||||
|
n = await bridge.process_one_pass()
|
||||||
|
if n:
|
||||||
|
logger.info("Processed %d mentions", n)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Bridge poll iteration failed")
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
stop.wait(),
|
||||||
|
timeout=config.kevin_bridge_poll_interval_seconds,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger.info("Bridge shutting down")
|
||||||
|
await redis.aclose()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
from alpaca.common.exceptions import APIError
|
from alpaca.common.exceptions import APIError
|
||||||
from alpaca.trading.client import TradingClient
|
from alpaca.trading.client import TradingClient
|
||||||
|
|
@ -238,3 +239,32 @@ class AlpacaBroker(BaseBroker):
|
||||||
self._client.get_order_by_id, order_id
|
self._client.get_order_by_id, order_id
|
||||||
)
|
)
|
||||||
return self._order_to_result(alpaca_order)
|
return self._order_to_result(alpaca_order)
|
||||||
|
|
||||||
|
async def is_asset_tradable(self, symbol: str) -> bool:
|
||||||
|
"""Return True iff Alpaca lists *symbol* as tradable.
|
||||||
|
|
||||||
|
Conservative on failure (returns False) so the bridge skips the
|
||||||
|
mention instead of half-trading it.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
asset = await asyncio.to_thread(self._client.get_asset, symbol)
|
||||||
|
return bool(getattr(asset, "tradable", False))
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def get_latest_price(self, symbol: str) -> Decimal:
|
||||||
|
"""Last trade price for *symbol* as Decimal. Returns Decimal('0')
|
||||||
|
on lookup failure — callers should treat 0 as 'no price available'.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# alpaca-py uses MarketDataClient for prices; the TradingClient
|
||||||
|
# does not expose latest_trade. Use the get_assets fallback
|
||||||
|
# (returns last_close) when present.
|
||||||
|
asset = await asyncio.to_thread(self._client.get_asset, symbol)
|
||||||
|
for attr in ("last_trade_price", "close", "last_close"):
|
||||||
|
v = getattr(asset, attr, None)
|
||||||
|
if v is not None:
|
||||||
|
return Decimal(str(v))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return Decimal("0")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue