trading/services/api_gateway/routes/controls.py

95 lines
2.6 KiB
Python

"""Control endpoints — pause/resume trading, force close positions."""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel
from services.api_gateway.auth.middleware import get_current_user
router = APIRouter(prefix="/api/controls", tags=["controls"])
TRADING_PAUSED_KEY = "trading:paused"
class ClosePositionRequest(BaseModel):
"""Body for the force-close-position endpoint."""
ticker: str
@router.post("/pause")
async def pause_trading(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Set Redis flag to pause trading."""
redis = request.app.state.redis
await redis.set(TRADING_PAUSED_KEY, "1")
return {"status": "paused"}
@router.post("/resume")
async def resume_trading(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Clear pause flag to resume trading."""
redis = request.app.state.redis
await redis.delete(TRADING_PAUSED_KEY)
return {"status": "active"}
@router.post("/close-position")
async def close_position(
body: ClosePositionRequest,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Force close an open position by ticker.
Publishes a close-position command to Redis so the trade executor
picks it up asynchronously.
"""
import json
from sqlalchemy import select
from shared.models.trading import Position
db = request.app.state.db_session_factory
async with db() as session:
position = (
await session.execute(
select(Position).where(Position.ticker == body.ticker.upper())
)
).scalar_one_or_none()
if position is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No open position for {body.ticker}",
)
# Publish close command to Redis for the trade executor
redis = request.app.state.redis
await redis.publish(
"controls:close_position",
json.dumps({"ticker": body.ticker.upper(), "qty": position.qty}),
)
return {
"status": "close_requested",
"ticker": body.ticker.upper(),
"qty": position.qty,
}
@router.get("/status")
async def get_trading_status(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Current trading status — active or paused."""
redis = request.app.state.redis
paused = await redis.get(TRADING_PAUSED_KEY)
return {"status": "paused" if paused else "active"}