trading/services/api_gateway/tasks/portfolio_sync.py
Viktor Barzin a3cdd0f1a5
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix: resolve all remaining TODOs, add dev mode auth bypass
- Learning engine: expand default weights from 3 to all 9 strategies
- Learning engine: resolve placeholder strategy_id with DB lookup
- Learning engine: pass strategy_sources from trade execution
- Trade executor: respect trading:paused Redis flag in RiskManager
- Portfolio sync: compute actual daily P&L from day-start snapshot
- Portfolio API: cumulative P&L from first snapshot, read pause flag
- Portfolio metrics: compute max drawdown and avg hold duration
- Add strategy_sources field to TradeExecution schema
- Add dev_mode config (TRADING_DEV_MODE) to bypass auth for local dev
- Dashboard: VITE_DEV_MODE bypasses ProtectedRoute and 401 redirects
- Vite proxy target configurable via VITE_API_TARGET
- Add top-level README.md and remaining-work-plan.md
- Update CLAUDE.md with correct counts and remove stale TODOs
- 404 tests passing

Made-with: Cursor
2026-02-25 22:02:25 +00:00

170 lines
5.6 KiB
Python

"""Background task that periodically snapshots Alpaca account state into the DB.
Runs on a configurable interval (default 60s) during US market hours,
creating ``PortfolioSnapshot`` rows and upserting ``Position`` rows so
the dashboard portfolio page reflects real brokerage data.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, time, timezone
from zoneinfo import ZoneInfo
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import async_sessionmaker
from services.api_gateway.config import ApiGatewayConfig
from shared.broker.alpaca_broker import AlpacaBroker
from shared.models.timeseries import PortfolioSnapshot
from shared.models.trading import Position
logger = logging.getLogger(__name__)
# US Eastern timezone for market hours check
_ET = ZoneInfo("America/New_York")
_MARKET_OPEN = time(9, 30)
_MARKET_CLOSE = time(16, 0)
def is_market_open(now_utc: datetime | None = None) -> bool:
"""Return ``True`` if the US stock market is currently open.
Checks for weekday (Mon-Fri) and time between 9:30 AM and 4:00 PM ET.
"""
if now_utc is None:
now_utc = datetime.now(timezone.utc)
now_et = now_utc.astimezone(_ET)
# Weekday check: Monday=0 ... Friday=4
if now_et.weekday() >= 5:
return False
return _MARKET_OPEN <= now_et.time() < _MARKET_CLOSE
async def _sync_once(
broker: AlpacaBroker,
session_factory: async_sessionmaker,
) -> None:
"""Perform a single portfolio snapshot and position upsert cycle."""
now = datetime.now(timezone.utc)
# 1. Snapshot account state
account = await broker.get_account()
# 2. Compute daily P&L: difference from the first snapshot today
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
daily_pnl = 0.0
async with session_factory() as read_session:
day_start_snapshot = (
await read_session.execute(
select(PortfolioSnapshot)
.where(PortfolioSnapshot.timestamp >= today_start)
.order_by(PortfolioSnapshot.timestamp)
.limit(1)
)
).scalar_one_or_none()
if day_start_snapshot is not None:
daily_pnl = account.portfolio_value - day_start_snapshot.total_value
snapshot = PortfolioSnapshot(
timestamp=now,
total_value=account.portfolio_value,
cash=account.cash,
positions_value=account.portfolio_value - account.cash,
daily_pnl=daily_pnl,
)
# 3. Fetch broker positions
broker_positions = await broker.get_positions()
broker_tickers = {p.ticker for p in broker_positions}
async with session_factory() as session:
async with session.begin():
# Insert portfolio snapshot
session.add(snapshot)
# Upsert positions
for pos_info in broker_positions:
result = await session.execute(
select(Position).where(Position.ticker == pos_info.ticker)
)
existing = result.scalar_one_or_none()
if existing is not None:
existing.qty = pos_info.qty
existing.avg_entry = pos_info.avg_entry
existing.unrealized_pnl = pos_info.unrealized_pnl
else:
new_pos = Position(
ticker=pos_info.ticker,
qty=pos_info.qty,
avg_entry=pos_info.avg_entry,
unrealized_pnl=pos_info.unrealized_pnl,
stop_loss=None,
take_profit=None,
)
session.add(new_pos)
# 4. Remove positions that are no longer held at the broker
if broker_tickers:
await session.execute(
delete(Position).where(Position.ticker.notin_(broker_tickers))
)
else:
# No positions at broker — delete all local positions
await session.execute(delete(Position))
logger.info(
"Portfolio sync complete: value=%.2f, cash=%.2f, positions=%d",
account.portfolio_value,
account.cash,
len(broker_positions),
)
async def portfolio_sync_loop(
config: ApiGatewayConfig,
session_factory: async_sessionmaker,
) -> None:
"""Run the portfolio sync loop until cancelled.
Parameters
----------
config:
API Gateway configuration containing Alpaca credentials and
the snapshot interval.
session_factory:
SQLAlchemy async session factory for DB access.
"""
if not config.alpaca_api_key or not config.alpaca_secret_key:
logger.warning(
"Alpaca API credentials not configured — portfolio sync disabled"
)
return
broker = AlpacaBroker(
api_key=config.alpaca_api_key,
secret_key=config.alpaca_secret_key,
paper=config.paper_trading,
)
logger.info(
"Portfolio sync started (interval=%ds, paper=%s)",
config.snapshot_interval_seconds,
config.paper_trading,
)
while True:
try:
if is_market_open():
await _sync_once(broker, session_factory)
else:
logger.debug("Market closed — skipping portfolio snapshot")
except asyncio.CancelledError:
logger.info("Portfolio sync task cancelled — shutting down")
raise
except Exception:
logger.exception("Portfolio sync error — will retry next interval")
await asyncio.sleep(config.snapshot_interval_seconds)