"""Seed default trading strategies. Inserts three strategies with equal initial weights (0.333 each): - momentum - mean_reversion - news_driven Usage: python -m scripts.seed_strategies """ from __future__ import annotations import asyncio import logging from sqlalchemy import select from shared.config import BaseConfig from shared.db import create_db from shared.models.trading import Strategy logger = logging.getLogger(__name__) # Default strategies to seed DEFAULT_STRATEGIES = [ { "name": "momentum", "description": ( "Buy when price crosses above N-period SMA with increasing volume; " "sell when it crosses below." ), "current_weight": 0.333, "active": True, }, { "name": "mean_reversion", "description": ( "Buy when RSI < 30 (oversold); sell when RSI > 70 (overbought). " "Signal strength proportional to RSI extremity." ), "current_weight": 0.333, "active": True, }, { "name": "news_driven", "description": ( "Buy on strong positive sentiment (score > 0.7, confidence > 0.6); " "sell on strong negative. Decay factor for stale news (> 4 hours)." ), "current_weight": 0.333, "active": True, }, ] async def seed(database_url: str | None = None) -> None: """Insert default strategies if they do not already exist. Parameters ---------- database_url: Override for the database URL. If ``None``, the default from :class:`~shared.config.BaseConfig` is used. """ config = BaseConfig() if database_url: config.database_url = database_url _engine, session_factory = create_db(config) async with session_factory() as session: for strategy_data in DEFAULT_STRATEGIES: # Check if the strategy already exists by name result = await session.execute( select(Strategy).where(Strategy.name == strategy_data["name"]) ) existing = result.scalar_one_or_none() if existing: logger.info( "Strategy '%s' already exists (weight=%.3f), skipping", existing.name, existing.current_weight, ) continue strategy = Strategy(**strategy_data) session.add(strategy) logger.info( "Inserted strategy '%s' with weight %.3f", strategy_data["name"], strategy_data["current_weight"], ) await session.commit() await _engine.dispose() logger.info("Strategy seeding complete") def main() -> None: """CLI entry-point.""" logging.basicConfig(level=logging.INFO) asyncio.run(seed()) if __name__ == "__main__": main()