113 lines
3.9 KiB
Python
113 lines
3.9 KiB
Python
"""FinBERT-based financial sentiment analyzer.
|
|
|
|
Uses the ProsusAI/finbert model via the HuggingFace transformers library
|
|
to classify article text as positive, negative, or neutral.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FinBERTAnalyzer:
|
|
"""Lazy-loading wrapper around a transformers sentiment-analysis pipeline.
|
|
|
|
The heavy ``transformers`` + ``torch`` imports and model download happen
|
|
only once, on the first call to :meth:`analyze`.
|
|
"""
|
|
|
|
def __init__(self, model_name: str = "ProsusAI/finbert", max_content_length: int = 512) -> None:
|
|
self.model_name = model_name
|
|
self.max_content_length = max_content_length
|
|
self._pipeline: Any | None = None
|
|
|
|
def _load_pipeline(self) -> Any:
|
|
"""Lazily load the transformers pipeline on first use."""
|
|
if self._pipeline is None:
|
|
from transformers import pipeline # type: ignore[import-untyped]
|
|
|
|
logger.info("Loading FinBERT model: %s", self.model_name)
|
|
self._pipeline = pipeline(
|
|
"sentiment-analysis",
|
|
model=self.model_name,
|
|
return_all_scores=True,
|
|
)
|
|
logger.info("FinBERT model loaded successfully")
|
|
return self._pipeline
|
|
|
|
async def analyze(self, title: str, content: str) -> tuple[float, float]:
|
|
"""Score the sentiment of an article.
|
|
|
|
Parameters
|
|
----------
|
|
title:
|
|
Article headline.
|
|
content:
|
|
Article body text.
|
|
|
|
Returns
|
|
-------
|
|
tuple[float, float]
|
|
``(sentiment_score, confidence)`` where *sentiment_score* is in
|
|
``[-1.0, 1.0]`` and *confidence* is in ``[0.0, 1.0]``.
|
|
|
|
The input text is truncated to ``max_content_length`` tokens by
|
|
passing it through the model's tokenizer truncation (handled
|
|
automatically by the transformers pipeline).
|
|
"""
|
|
pipe = self._load_pipeline()
|
|
|
|
# Combine title and content; the pipeline will truncate to model max tokens.
|
|
text = f"{title}. {content}"
|
|
# Truncate to a reasonable character length proportional to max_content_length
|
|
# tokens (rough estimate: 1 token ~ 4 chars for English). The pipeline
|
|
# tokenizer will do the precise truncation, but this avoids sending
|
|
# enormous strings.
|
|
char_limit = self.max_content_length * 4
|
|
text = text[:char_limit]
|
|
|
|
# Run the blocking model inference in a thread pool so we don't block
|
|
# the event loop.
|
|
loop = asyncio.get_running_loop()
|
|
results = await loop.run_in_executor(
|
|
None,
|
|
lambda: pipe(text, truncation=True, max_length=self.max_content_length),
|
|
)
|
|
|
|
return self._parse_scores(results)
|
|
|
|
@staticmethod
|
|
def _parse_scores(results: list[list[dict[str, Any]]]) -> tuple[float, float]:
|
|
"""Map pipeline output to ``(score, confidence)``.
|
|
|
|
The ``return_all_scores=True`` pipeline returns a list of lists of dicts:
|
|
``[[{"label": "positive", "score": 0.85}, ...]]``.
|
|
|
|
Mapping:
|
|
- ``"positive"`` -> +1
|
|
- ``"negative"`` -> -1
|
|
- ``"neutral"`` -> 0
|
|
|
|
The sentiment score is the weighted sum of label polarities scaled by
|
|
their softmax probabilities. Confidence is the maximum softmax
|
|
probability.
|
|
"""
|
|
label_map = {"positive": 1.0, "negative": -1.0, "neutral": 0.0}
|
|
|
|
# results is [[{label, score}, ...]]
|
|
scores = results[0]
|
|
|
|
sentiment_score = 0.0
|
|
confidence = 0.0
|
|
for entry in scores:
|
|
label = entry["label"].lower()
|
|
prob = entry["score"]
|
|
sentiment_score += label_map.get(label, 0.0) * prob
|
|
if prob > confidence:
|
|
confidence = prob
|
|
|
|
return sentiment_score, confidence
|