feat(api): /api/meet-kevin/* routes (11 endpoints)

This commit is contained in:
Viktor Barzin 2026-05-21 19:53:16 +00:00
parent 8f5ee8f1c3
commit bfa7a503da
4 changed files with 840 additions and 0 deletions

View file

@ -92,6 +92,7 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
from services.api_gateway.routes.news import router as news_router
from services.api_gateway.routes.controls import router as controls_router
from services.api_gateway.routes.backtest import router as backtest_router
from services.api_gateway.routes.meet_kevin import router as meet_kevin_router
app.include_router(portfolio_router)
app.include_router(trades_router)
@ -100,6 +101,7 @@ def create_app(config: ApiGatewayConfig | None = None) -> FastAPI:
app.include_router(news_router)
app.include_router(controls_router)
app.include_router(backtest_router)
app.include_router(meet_kevin_router)
# WebSocket
from services.api_gateway.ws import router as ws_router

View file

@ -0,0 +1,665 @@
"""Meet Kevin pipeline endpoints — /api/meet-kevin/*."""
from __future__ import annotations
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import case, desc, func, select
from services.api_gateway.auth.middleware import get_current_user
from shared.models.meet_kevin import (
KevinAnalysis,
KevinChannel,
KevinStockMention,
KevinTickerAction,
KevinTranscript,
KevinVideo,
KevinVideoStatus,
)
router = APIRouter(prefix="/api/meet-kevin", tags=["meet-kevin"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _video_summary_dict(video: KevinVideo, top_tickers: list, analysis: KevinAnalysis | None) -> dict:
return {
"id": video.id,
"youtube_video_id": video.youtube_video_id,
"title": video.title,
"published_at": video.published_at.isoformat() if video.published_at else None,
"thumbnail_url": video.thumbnail_url,
"status": video.status.value if video.status else None,
"failure_reason": video.failure_reason,
"retry_count": video.retry_count,
"top_tickers": top_tickers,
"outlook": analysis.market_outlook_direction.value if analysis else None,
"one_line_summary": (analysis.summary[:200] if analysis and analysis.summary else None),
}
def _analysis_dict(analysis: KevinAnalysis) -> dict:
return {
"id": analysis.id,
"model": analysis.model,
"prompt_version": analysis.prompt_version,
"market_outlook_direction": analysis.market_outlook_direction.value,
"market_outlook_reasoning": analysis.market_outlook_reasoning,
"macro_themes": analysis.macro_themes_json or [],
"key_risks": analysis.key_risks_json or [],
"summary": analysis.summary,
"prompt_tokens": analysis.prompt_tokens,
"completion_tokens": analysis.completion_tokens,
"cost_usd": float(analysis.cost_usd),
"created_at": analysis.created_at.isoformat(),
}
def _mention_dict(mention: KevinStockMention) -> dict:
return {
"id": mention.id,
"symbol": mention.symbol,
"action": mention.action.value,
"conviction": float(mention.conviction),
"time_horizon": mention.time_horizon.value,
"rationale_quote": mention.rationale_quote,
"video_timestamp_seconds": mention.video_timestamp_seconds,
"created_at": mention.created_at.isoformat(),
}
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@router.get("/health")
async def health(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Pipeline health: status counts, daily cost, and last poll timestamp."""
db = request.app.state.db_session_factory
async with db() as session:
counts_by_status: dict[str, int] = {}
for status in KevinVideoStatus:
result = await session.execute(
select(func.count()).select_from(KevinVideo).where(KevinVideo.status == status)
)
counts_by_status[status.value] = result.scalar() or 0
# Daily cost
today_start = func.date_trunc("day", func.now())
cost_result = await session.execute(
select(func.sum(KevinAnalysis.cost_usd)).where(
KevinAnalysis.created_at >= today_start
)
)
daily_cost = float(cost_result.scalar() or 0.0)
# Last polled_at + cap from first channel
poll_result = await session.execute(
select(KevinChannel.last_polled_at).order_by(desc(KevinChannel.id)).limit(1)
)
last_polled_at = poll_result.scalar_one_or_none()
cap_result = await session.execute(
select(KevinChannel.daily_cost_cap_usd).order_by(desc(KevinChannel.id)).limit(1)
)
cap = cap_result.scalar_one_or_none()
daily_cost_cap = float(cap) if cap is not None else 5.0
return {
"counts_by_status": counts_by_status,
"daily_cost_usd": daily_cost,
"daily_cost_cap_usd": daily_cost_cap,
"cost_capped": daily_cost >= daily_cost_cap,
"last_polled_at": last_polled_at.isoformat() if last_polled_at else None,
}
# ---------------------------------------------------------------------------
# Channels
# ---------------------------------------------------------------------------
@router.get("/channels")
async def list_channels(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""List all KevinChannel rows."""
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(select(KevinChannel).order_by(KevinChannel.id))
channels = result.scalars().all()
return {
"channels": [
{
"id": ch.id,
"youtube_channel_id": ch.youtube_channel_id,
"title": ch.title,
"poll_enabled": ch.poll_enabled,
"poll_interval_seconds": ch.poll_interval_seconds,
"daily_cost_cap_usd": float(ch.daily_cost_cap_usd),
"last_polled_at": ch.last_polled_at.isoformat() if ch.last_polled_at else None,
"created_at": ch.created_at.isoformat(),
"updated_at": ch.updated_at.isoformat(),
}
for ch in channels
]
}
@router.patch("/channels/{channel_id}")
async def patch_channel(
channel_id: int,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Update poll_enabled, poll_interval_seconds, or daily_cost_cap_usd on a channel."""
body = await request.json()
allowed = {"poll_enabled", "poll_interval_seconds", "daily_cost_cap_usd"}
updates = {k: v for k, v in body.items() if k in allowed}
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(
select(KevinChannel).where(KevinChannel.id == channel_id)
)
channel = result.scalar_one_or_none()
if channel is None:
raise HTTPException(status_code=404, detail="Channel not found")
for field, value in updates.items():
setattr(channel, field, value)
channel.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(channel)
return {
"id": channel.id,
"youtube_channel_id": channel.youtube_channel_id,
"title": channel.title,
"poll_enabled": channel.poll_enabled,
"poll_interval_seconds": channel.poll_interval_seconds,
"daily_cost_cap_usd": float(channel.daily_cost_cap_usd),
"last_polled_at": channel.last_polled_at.isoformat() if channel.last_polled_at else None,
}
# ---------------------------------------------------------------------------
# Videos
# ---------------------------------------------------------------------------
@router.get("/videos")
async def list_videos(
request: Request,
_user: dict = Depends(get_current_user),
status: str | None = Query(default=None),
q: str | None = Query(default=None),
page: int = Query(default=1, ge=1),
per_page: int = Query(default=20, ge=1, le=100),
) -> dict:
"""Paginated video feed ordered by published_at DESC.
Each item includes top_tickers (top 5 unique by conviction DESC) and
outlook/one_line_summary from the latest analysis row.
"""
db = request.app.state.db_session_factory
async with db() as session:
base = select(KevinVideo).order_by(desc(KevinVideo.published_at))
count_base = select(func.count()).select_from(KevinVideo)
if status:
try:
status_enum = KevinVideoStatus(status)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
base = base.where(KevinVideo.status == status_enum)
count_base = count_base.where(KevinVideo.status == status_enum)
if q:
pattern = f"%{q}%"
base = base.where(KevinVideo.title.ilike(pattern))
count_base = count_base.where(KevinVideo.title.ilike(pattern))
total = (await session.execute(count_base)).scalar() or 0
offset = (page - 1) * per_page
result = await session.execute(base.offset(offset).limit(per_page))
videos = result.scalars().all()
# For each video, fetch the latest analysis and top tickers
video_rows = []
for video in videos:
analysis = None
top_tickers: list[dict] = []
if video.status == KevinVideoStatus.ANALYZED:
analysis_result = await session.execute(
select(KevinAnalysis)
.where(KevinAnalysis.video_id == video.id)
.order_by(desc(KevinAnalysis.created_at))
.limit(1)
)
analysis = analysis_result.scalar_one_or_none()
if analysis:
mentions_result = await session.execute(
select(KevinStockMention)
.where(KevinStockMention.analysis_id == analysis.id)
.order_by(desc(KevinStockMention.conviction))
.limit(5)
)
top_tickers = [
{"symbol": m.symbol, "action": m.action.value, "conviction": float(m.conviction)}
for m in mentions_result.scalars().all()
]
video_rows.append(_video_summary_dict(video, top_tickers, analysis))
return {
"videos": video_rows,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if per_page else 0,
}
@router.get("/videos/{video_id}")
async def get_video(
video_id: int,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Single video + latest analysis + first 5 ticker mentions by conviction DESC."""
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(
select(KevinVideo).where(KevinVideo.id == video_id)
)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail="Video not found")
analysis = None
mentions: list[dict] = []
transcript_available = False
# Check transcript
trans_result = await session.execute(
select(KevinTranscript.id).where(KevinTranscript.video_id == video_id)
)
transcript_available = trans_result.scalar_one_or_none() is not None
# Latest analysis
analysis_result = await session.execute(
select(KevinAnalysis)
.where(KevinAnalysis.video_id == video_id)
.order_by(desc(KevinAnalysis.created_at))
.limit(1)
)
analysis = analysis_result.scalar_one_or_none()
if analysis:
mentions_result = await session.execute(
select(KevinStockMention)
.where(KevinStockMention.analysis_id == analysis.id)
.order_by(desc(KevinStockMention.conviction))
.limit(5)
)
mentions = [_mention_dict(m) for m in mentions_result.scalars().all()]
return {
"id": video.id,
"youtube_video_id": video.youtube_video_id,
"title": video.title,
"description": video.description,
"published_at": video.published_at.isoformat() if video.published_at else None,
"duration_seconds": video.duration_seconds,
"thumbnail_url": video.thumbnail_url,
"status": video.status.value if video.status else None,
"failure_reason": video.failure_reason,
"retry_count": video.retry_count,
"transcript_available": transcript_available,
"analysis": _analysis_dict(analysis) if analysis else None,
"top_mentions": mentions,
}
@router.get("/videos/{video_id}/transcript")
async def get_video_transcript(
video_id: int,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Return transcript segments_json + source + language; 404 if missing."""
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(
select(KevinTranscript).where(KevinTranscript.video_id == video_id)
)
transcript = result.scalar_one_or_none()
if transcript is None:
raise HTTPException(status_code=404, detail="Transcript not found")
return {
"video_id": video_id,
"source": transcript.source.value,
"language": transcript.language,
"segments_json": transcript.segments_json,
"word_count": transcript.word_count,
}
@router.post("/videos/{video_id}/reprocess")
async def reprocess_video(
video_id: int,
request: Request,
_user: dict = Depends(get_current_user),
stage: str = Query(default="auto"),
) -> dict:
"""Reset video status to trigger reprocessing.
stage=captions|auto reset faileddiscovered (retry caption extraction)
stage=analysis reset captioned|failed|analyzedcaptioned (retry analysis)
"""
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(
select(KevinVideo).where(KevinVideo.id == video_id)
)
video = result.scalar_one_or_none()
if video is None:
raise HTTPException(status_code=404, detail="Video not found")
current = video.status
if stage in ("captions", "auto"):
if current != KevinVideoStatus.FAILED:
raise HTTPException(
status_code=400,
detail=f"Cannot reprocess captions: video status is '{current.value}' (expected 'failed')",
)
video.status = KevinVideoStatus.DISCOVERED
video.retry_count = 0
video.failure_reason = None
elif stage == "analysis":
allowed = {KevinVideoStatus.CAPTIONED, KevinVideoStatus.FAILED, KevinVideoStatus.ANALYZED}
if current not in allowed:
raise HTTPException(
status_code=400,
detail=f"Cannot reprocess analysis: video status is '{current.value}'",
)
video.status = KevinVideoStatus.CAPTIONED
video.retry_count = 0
video.failure_reason = None
else:
raise HTTPException(status_code=400, detail=f"Unknown stage: {stage}")
await session.commit()
return {
"video_id": video_id,
"stage": stage,
"new_status": video.status.value,
}
# ---------------------------------------------------------------------------
# Stocks
# ---------------------------------------------------------------------------
@router.get("/stocks")
async def list_stocks(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Distinct symbols with mention_count, last_seen_at, latest_action, latest_conviction, avg_conviction."""
db = request.app.state.db_session_factory
async with db() as session:
# Aggregate per symbol
agg = (
select(
KevinStockMention.symbol,
func.count().label("mention_count"),
func.max(KevinStockMention.created_at).label("last_seen_at"),
func.avg(KevinStockMention.conviction).label("avg_conviction"),
)
.group_by(KevinStockMention.symbol)
.order_by(desc(func.max(KevinStockMention.created_at)))
)
result = await session.execute(agg)
rows = result.all()
stocks = []
for row in rows:
# Latest mention for action + conviction
latest_result = await session.execute(
select(KevinStockMention)
.where(KevinStockMention.symbol == row.symbol)
.order_by(desc(KevinStockMention.created_at))
.limit(1)
)
latest = latest_result.scalar_one_or_none()
stocks.append({
"symbol": row.symbol,
"mention_count": row.mention_count,
"last_seen_at": row.last_seen_at.isoformat() if row.last_seen_at else None,
"latest_action": latest.action.value if latest else None,
"latest_conviction": float(latest.conviction) if latest else None,
"avg_conviction": float(row.avg_conviction) if row.avg_conviction else 0.0,
})
return {"stocks": stocks}
@router.get("/stocks/{symbol}")
async def get_stock(
symbol: str,
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""All mentions for a symbol joined with their videos, ordered by published_at DESC; 404 if none."""
symbol = symbol.upper()
db = request.app.state.db_session_factory
async with db() as session:
result = await session.execute(
select(KevinStockMention, KevinVideo)
.join(KevinVideo, KevinStockMention.video_id == KevinVideo.id)
.where(KevinStockMention.symbol == symbol)
.order_by(desc(KevinVideo.published_at))
)
rows = result.all()
if not rows:
raise HTTPException(status_code=404, detail=f"No mentions found for {symbol}")
return {
"symbol": symbol,
"mentions": [
{
"mention_id": mention.id,
"video_id": video.id,
"youtube_video_id": video.youtube_video_id,
"video_title": video.title,
"published_at": video.published_at.isoformat() if video.published_at else None,
"action": mention.action.value,
"conviction": float(mention.conviction),
"time_horizon": mention.time_horizon.value,
"rationale_quote": mention.rationale_quote,
"video_timestamp_seconds": mention.video_timestamp_seconds,
"created_at": mention.created_at.isoformat(),
}
for mention, video in rows
],
}
@router.get("/stocks/{symbol}/timeline")
async def get_stock_timeline(
symbol: str,
request: Request,
_user: dict = Depends(get_current_user),
bucket: str = Query(default="day"),
) -> dict:
"""date_trunc aggregation: avg_conviction, mention_count, net_action_score by day or week."""
if bucket not in ("day", "week"):
raise HTTPException(status_code=400, detail="bucket must be 'day' or 'week'")
symbol = symbol.upper()
db = request.app.state.db_session_factory
async with db() as session:
# net_action_score: +conviction for buy, -conviction for sell, 0 otherwise
net_score_expr = func.sum(
case(
(KevinStockMention.action == KevinTickerAction.BUY, KevinStockMention.conviction),
(KevinStockMention.action == KevinTickerAction.SELL, -KevinStockMention.conviction),
else_=0,
)
).label("net_action_score")
bucket_col = func.date_trunc(bucket, KevinStockMention.created_at).label("bucket_date")
timeline_query = (
select(
bucket_col,
func.avg(KevinStockMention.conviction).label("avg_conviction"),
func.count().label("mention_count"),
net_score_expr,
)
.where(KevinStockMention.symbol == symbol)
.group_by(bucket_col)
.order_by(bucket_col)
)
result = await session.execute(timeline_query)
rows = result.all()
return {
"symbol": symbol,
"bucket": bucket,
"timeline": [
{
"bucket_date": row.bucket_date.isoformat() if row.bucket_date else None,
"avg_conviction": float(row.avg_conviction) if row.avg_conviction else 0.0,
"mention_count": row.mention_count,
"net_action_score": float(row.net_action_score) if row.net_action_score else 0.0,
}
for row in rows
],
}
# ---------------------------------------------------------------------------
# Dashboard
# ---------------------------------------------------------------------------
@router.get("/dashboard")
async def get_dashboard(
request: Request,
_user: dict = Depends(get_current_user),
) -> dict:
"""Aggregate home view: latest video + analysis, top conviction, 14d outlook trend."""
db = request.app.state.db_session_factory
async with db() as session:
# Latest analyzed video
video_result = await session.execute(
select(KevinVideo)
.where(KevinVideo.status == KevinVideoStatus.ANALYZED)
.order_by(desc(KevinVideo.published_at))
.limit(1)
)
latest_video = video_result.scalar_one_or_none()
latest_video_dict = None
latest_analysis_dict = None
top_mentions: list[dict] = []
if latest_video:
analysis_result = await session.execute(
select(KevinAnalysis)
.where(KevinAnalysis.video_id == latest_video.id)
.order_by(desc(KevinAnalysis.created_at))
.limit(1)
)
latest_analysis = analysis_result.scalar_one_or_none()
if latest_analysis:
latest_analysis_dict = _analysis_dict(latest_analysis)
mentions_result = await session.execute(
select(KevinStockMention)
.where(KevinStockMention.analysis_id == latest_analysis.id)
.order_by(desc(KevinStockMention.conviction))
.limit(5)
)
top_mentions = [_mention_dict(m) for m in mentions_result.scalars().all()]
latest_video_dict = {
"id": latest_video.id,
"youtube_video_id": latest_video.youtube_video_id,
"title": latest_video.title,
"published_at": latest_video.published_at.isoformat() if latest_video.published_at else None,
"thumbnail_url": latest_video.thumbnail_url,
}
# Top conviction last 7 days
seven_days_ago = func.now() - func.cast("7 days", type_=None)
top_conviction_result = await session.execute(
select(
KevinStockMention.symbol,
func.max(KevinStockMention.conviction).label("max_conviction"),
func.count().label("mention_count"),
)
.where(KevinStockMention.created_at >= func.now() - func.cast("7 days", type_=None))
.group_by(KevinStockMention.symbol)
.order_by(desc(func.max(KevinStockMention.conviction)))
.limit(10)
)
top_conviction = [
{
"symbol": row.symbol,
"max_conviction": float(row.max_conviction),
"mention_count": row.mention_count,
}
for row in top_conviction_result.all()
]
# 14-day outlook trend: date × direction → count
outlook_result = await session.execute(
select(
func.date_trunc("day", KevinAnalysis.created_at).label("day"),
KevinAnalysis.market_outlook_direction,
func.count().label("count"),
)
.where(KevinAnalysis.created_at >= func.now() - func.cast("14 days", type_=None))
.group_by(
func.date_trunc("day", KevinAnalysis.created_at),
KevinAnalysis.market_outlook_direction,
)
.order_by(func.date_trunc("day", KevinAnalysis.created_at))
)
outlook_trend = [
{
"day": row.day.isoformat() if row.day else None,
"direction": row.market_outlook_direction.value,
"count": row.count,
}
for row in outlook_result.all()
]
return {
"latest_video": latest_video_dict,
"latest_analysis": latest_analysis_dict,
"top_mentions": top_mentions,
"top_conviction_7d": top_conviction,
"outlook_trend_14d": outlook_trend,
}

View file

View file

@ -0,0 +1,173 @@
"""Smoke tests for /api/meet-kevin/* routes.
Three behaviors covered:
1. GET /api/meet-kevin/health 200 with counts_by_status, daily_cost_usd, daily_cost_cap_usd
2. GET /api/meet-kevin/videos (empty DB) {"videos": [], "total": 0, "page": 1, "per_page": 20}
3. GET /api/meet-kevin/stocks (empty DB) {"stocks": []}
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi.testclient import TestClient
from services.api_gateway.auth.middleware import get_current_user
from services.api_gateway.config import ApiGatewayConfig
from services.api_gateway.main import create_app
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def config() -> ApiGatewayConfig:
return ApiGatewayConfig(
jwt_secret_key="test-secret-meet-kevin",
database_url="sqlite+aiosqlite:///:memory:",
redis_url="redis://localhost:6379/0",
)
@pytest.fixture()
def mock_user() -> dict:
return {"sub": "user-test", "username": "tester", "type": "access"}
@pytest.fixture()
def mock_redis() -> AsyncMock:
redis = AsyncMock()
redis.get = AsyncMock(return_value=None)
redis.set = AsyncMock()
return redis
@pytest.fixture()
def mock_session():
"""Async context-manager session mock."""
session = AsyncMock()
session.__aenter__ = AsyncMock(return_value=session)
session.__aexit__ = AsyncMock(return_value=False)
return session
@pytest.fixture()
def mock_factory(mock_session):
factory = MagicMock(return_value=mock_session)
return factory
def _scalar_result(value):
"""Mock execute() result that returns a scalar."""
result = MagicMock()
result.scalar.return_value = value
result.scalar_one_or_none.return_value = value
result.scalars.return_value.all.return_value = []
result.all.return_value = []
return result
def _rows_result(rows):
"""Mock execute() result that returns rows."""
result = MagicMock()
result.all.return_value = rows
result.scalars.return_value.all.return_value = rows
result.scalar.return_value = len(rows)
result.scalar_one_or_none.return_value = rows[0] if rows else None
return result
@pytest.fixture()
def client(config, mock_user, mock_redis, mock_factory) -> TestClient:
app = create_app(config)
app.dependency_overrides[get_current_user] = lambda: mock_user
app.state.redis = mock_redis
app.state.db_session_factory = mock_factory
app.state.db_engine = MagicMock()
app.state.config = config
return TestClient(app, raise_server_exceptions=False)
# ---------------------------------------------------------------------------
# Test: GET /api/meet-kevin/health
# ---------------------------------------------------------------------------
class TestMeetKevinHealth:
def test_health_returns_required_keys(
self, client: TestClient, mock_session: AsyncMock
) -> None:
"""health returns counts_by_status, daily_cost_usd, daily_cost_cap_usd."""
# Health calls: count queries by status + daily cost sum + last_polled_at
# Return Nones/zeros for empty DB — any 4 sequential scalar calls
mock_session.execute = AsyncMock(
side_effect=[
_scalar_result(0), # discovered count
_scalar_result(0), # captioned count
_scalar_result(0), # analyzed count
_scalar_result(0), # failed count
_scalar_result(0), # skipped count
_scalar_result(None), # daily cost sum
_scalar_result(None), # last_polled_at
_scalar_result(None), # daily_cost_cap_usd
]
)
resp = client.get("/api/meet-kevin/health")
assert resp.status_code == 200, resp.text
data = resp.json()
assert "counts_by_status" in data
assert "daily_cost_usd" in data
assert "daily_cost_cap_usd" in data
# ---------------------------------------------------------------------------
# Test: GET /api/meet-kevin/videos (empty DB)
# ---------------------------------------------------------------------------
class TestMeetKevinVideos:
def test_videos_empty_db_returns_empty_page(
self, client: TestClient, mock_session: AsyncMock
) -> None:
"""GET /api/meet-kevin/videos on empty DB returns empty paginated response."""
# Two execute calls: count query + data query
mock_session.execute = AsyncMock(
side_effect=[
_scalar_result(0), # count
_rows_result([]), # data rows
]
)
resp = client.get("/api/meet-kevin/videos")
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["videos"] == []
assert data["total"] == 0
assert data["page"] == 1
assert data["per_page"] == 20
# ---------------------------------------------------------------------------
# Test: GET /api/meet-kevin/stocks (empty DB)
# ---------------------------------------------------------------------------
class TestMeetKevinStocks:
def test_stocks_empty_db_returns_empty_list(
self, client: TestClient, mock_session: AsyncMock
) -> None:
"""GET /api/meet-kevin/stocks on empty DB returns {"stocks": []}."""
mock_session.execute = AsyncMock(
side_effect=[
_rows_result([]), # aggregate query
]
)
resp = client.get("/api/meet-kevin/stocks")
assert resp.status_code == 200, resp.text
data = resp.json()
assert data == {"stocks": []}