feat: add fundamental data providers (Alpha Vantage, FMP, Yahoo Finance) with rotation

This commit is contained in:
Viktor Barzin 2026-02-23 21:41:16 +00:00
parent 2398e8faf6
commit aa47e896dd
No known key found for this signature in database
GPG key ID: 0EB088298288D958
7 changed files with 704 additions and 0 deletions

View file

@ -0,0 +1,6 @@
"""Fundamental data providers for stock financial metrics."""
from shared.fundamentals.base import FundamentalsProvider
from shared.fundamentals.rotating import RotatingProvider
__all__ = ["FundamentalsProvider", "RotatingProvider"]

View file

@ -0,0 +1,79 @@
"""Alpha Vantage fundamental data provider."""
from __future__ import annotations
import logging
import httpx
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
_BASE_URL = "https://www.alphavantage.co/query"
def _safe_float(val: str | None) -> float | None:
"""Convert a string value to float, returning None for missing/invalid values."""
if val is None or val in ("None", "-", ""):
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def _safe_float_div100(val: str | None) -> float | None:
"""Convert a string value to float and divide by 100 (e.g. D/E ratio)."""
f = _safe_float(val)
return f / 100.0 if f is not None else None
class AlphaVantageProvider(FundamentalsProvider):
"""Fetches fundamental data from the Alpha Vantage OVERVIEW endpoint."""
def __init__(self, api_key: str, timeout: float = 10.0) -> None:
self._api_key = api_key
self._client = httpx.AsyncClient(timeout=timeout)
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Fetch company overview from Alpha Vantage."""
try:
resp = await self._client.get(
_BASE_URL,
params={
"function": "OVERVIEW",
"symbol": ticker,
"apikey": self._api_key,
},
)
resp.raise_for_status()
data = resp.json()
# Rate-limit / error detection
if "Note" in data or "Information" in data:
msg = data.get("Note") or data.get("Information", "")
logger.warning("Alpha Vantage rate limit for %s: %s", ticker, msg)
return None
# An empty or error response won't have "Symbol"
if "Symbol" not in data:
logger.warning("Alpha Vantage returned no data for %s", ticker)
return None
return FundamentalsSnapshot(
ticker=ticker,
eps=_safe_float(data.get("EPS")),
pe_ratio=_safe_float(data.get("PERatio")),
peg_ratio=_safe_float(data.get("PEGRatio")),
revenue_growth=_safe_float(data.get("QuarterlyRevenueGrowthYOY")),
profit_margin=_safe_float(data.get("ProfitMargin")),
debt_to_equity=_safe_float_div100(data.get("DebtToEquity")),
market_cap=_safe_float(data.get("MarketCapitalization")),
)
except Exception:
logger.exception("Alpha Vantage fetch failed for %s", ticker)
return None
async def close(self) -> None:
await self._client.aclose()

View file

@ -0,0 +1,47 @@
"""Abstract base class for fundamental data providers."""
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import UTC, datetime
from pydantic import BaseModel, Field
# ---------------------------------------------------------------------------
# FundamentalsSnapshot — canonical schema
#
# NOTE: This is the authoritative definition until it is moved into
# ``shared.schemas.trading``. Once the parallel schema task lands, delete
# this class and update all imports to point at the schemas module.
# ---------------------------------------------------------------------------
class FundamentalsSnapshot(BaseModel):
"""Point-in-time snapshot of a stock's fundamental financial metrics."""
ticker: str
eps: float | None = None
pe_ratio: float | None = None
peg_ratio: float | None = None
revenue_growth: float | None = None
profit_margin: float | None = None
debt_to_equity: float | None = None
market_cap: float | None = None
fetched_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
model_config = {"from_attributes": True}
# ---------------------------------------------------------------------------
# Abstract provider
# ---------------------------------------------------------------------------
class FundamentalsProvider(ABC):
"""Abstract base class for fundamental data providers."""
@abstractmethod
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Fetch fundamental data for *ticker*. Returns None on failure/rate limit."""
...

View file

@ -0,0 +1,64 @@
"""Financial Modeling Prep (FMP) fundamental data provider."""
from __future__ import annotations
import logging
import httpx
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
_BASE_URL = "https://financialmodelingprep.com/api/v3/key-metrics-ttm"
def _safe_float(val: object) -> float | None:
"""Convert a value to float, returning None for missing/invalid values."""
if val is None:
return None
try:
return float(val)
except (ValueError, TypeError):
return None
class FMPProvider(FundamentalsProvider):
"""Fetches fundamental data from Financial Modeling Prep key-metrics-ttm endpoint."""
def __init__(self, api_key: str, timeout: float = 10.0) -> None:
self._api_key = api_key
self._client = httpx.AsyncClient(timeout=timeout)
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Fetch TTM key metrics from FMP."""
try:
resp = await self._client.get(
f"{_BASE_URL}/{ticker}",
params={"apikey": self._api_key},
)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list) or len(data) == 0:
logger.warning("FMP returned empty response for %s", ticker)
return None
item = data[0]
return FundamentalsSnapshot(
ticker=ticker,
eps=_safe_float(item.get("netIncomePerShareTTM")),
pe_ratio=_safe_float(item.get("peRatioTTM")),
peg_ratio=_safe_float(item.get("pegRatioTTM")),
revenue_growth=_safe_float(item.get("revenueGrowth")),
profit_margin=_safe_float(item.get("netProfitMarginTTM")),
debt_to_equity=_safe_float(item.get("debtToEquityTTM")),
market_cap=_safe_float(item.get("marketCapTTM")),
)
except Exception:
logger.exception("FMP fetch failed for %s", ticker)
return None
async def close(self) -> None:
await self._client.aclose()

View file

@ -0,0 +1,50 @@
"""Rotating (fallback) wrapper over multiple fundamental data providers."""
from __future__ import annotations
import logging
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
class RotatingProvider(FundamentalsProvider):
"""Tries each provider in order and returns the first successful result.
If a provider raises an exception it is caught, logged, and the next
provider is attempted. Returns ``None`` only when every provider either
returned ``None`` or raised.
"""
def __init__(self, providers: list[FundamentalsProvider]) -> None:
if not providers:
raise ValueError("RotatingProvider requires at least one provider")
self._providers = providers
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Try each provider in sequence; return the first non-None result."""
for provider in self._providers:
provider_name = type(provider).__name__
try:
result = await provider.fetch(ticker)
if result is not None:
logger.info(
"Provider %s succeeded for %s",
provider_name,
ticker,
)
return result
logger.debug(
"Provider %s returned None for %s, trying next",
provider_name,
ticker,
)
except Exception:
logger.exception(
"Provider %s raised for %s, trying next",
provider_name,
ticker,
)
logger.warning("All providers exhausted for %s", ticker)
return None

View file

@ -0,0 +1,61 @@
"""Yahoo Finance fundamental data provider (via yfinance)."""
from __future__ import annotations
import asyncio
import logging
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
def _safe_float(val: object) -> float | None:
"""Convert a value to float, returning None for missing/invalid values."""
if val is None:
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def _safe_float_div100(val: object) -> float | None:
"""Convert a value to float and divide by 100 (e.g. D/E ratio)."""
f = _safe_float(val)
return f / 100.0 if f is not None else None
def _fetch_info(ticker: str) -> dict:
"""Synchronous helper — runs in a thread pool to avoid blocking the loop."""
import yfinance # noqa: local import to avoid hard dependency
return yfinance.Ticker(ticker).info
class YahooFinanceProvider(FundamentalsProvider):
"""Fetches fundamental data from Yahoo Finance via the yfinance library."""
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Fetch ticker info from Yahoo Finance in a thread pool."""
try:
loop = asyncio.get_running_loop()
info = await loop.run_in_executor(None, _fetch_info, ticker)
if not info:
logger.warning("Yahoo Finance returned empty info for %s", ticker)
return None
return FundamentalsSnapshot(
ticker=ticker,
eps=_safe_float(info.get("trailingEps")),
pe_ratio=_safe_float(info.get("trailingPE")),
peg_ratio=_safe_float(info.get("pegRatio")),
revenue_growth=_safe_float(info.get("revenueGrowth")),
profit_margin=_safe_float(info.get("profitMargins")),
debt_to_equity=_safe_float_div100(info.get("debtToEquity")),
market_cap=_safe_float(info.get("marketCap")),
)
except Exception:
logger.exception("Yahoo Finance fetch failed for %s", ticker)
return None