feat: sentiment analyzer — FinBERT + Ollama tiered analysis
This commit is contained in:
parent
9f46071502
commit
6952a829ae
11 changed files with 976 additions and 1 deletions
169
services/sentiment_analyzer/main.py
Normal file
169
services/sentiment_analyzer/main.py
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
"""Sentiment Analyzer service — main entry point.
|
||||
|
||||
Consumes ``news:raw`` articles from Redis Streams, scores them using a
|
||||
tiered approach (FinBERT first, Ollama fallback for low-confidence results),
|
||||
extracts ticker mentions, and publishes ``ScoredArticle`` messages to
|
||||
``news:scored``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
from services.sentiment_analyzer.analyzers.finbert import FinBERTAnalyzer
|
||||
from services.sentiment_analyzer.analyzers.ollama_analyzer import OllamaAnalyzer
|
||||
from services.sentiment_analyzer.config import SentimentAnalyzerConfig
|
||||
from services.sentiment_analyzer.ticker_extractor import extract_tickers
|
||||
from shared.redis_streams import StreamConsumer, StreamPublisher
|
||||
from shared.schemas.news import RawArticle, ScoredArticle
|
||||
from shared.telemetry import setup_telemetry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def process_article(
|
||||
article: RawArticle,
|
||||
finbert: FinBERTAnalyzer,
|
||||
ollama: OllamaAnalyzer,
|
||||
publisher: StreamPublisher,
|
||||
config: SentimentAnalyzerConfig,
|
||||
counters: dict,
|
||||
) -> None:
|
||||
"""Score a single article and publish one ScoredArticle per extracted ticker.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
article:
|
||||
The raw article consumed from the ``news:raw`` stream.
|
||||
finbert:
|
||||
FinBERT analyzer instance.
|
||||
ollama:
|
||||
Ollama analyzer instance (used as fallback).
|
||||
publisher:
|
||||
Publishes results to ``news:scored``.
|
||||
config:
|
||||
Service configuration (confidence threshold, etc.).
|
||||
counters:
|
||||
Dict of OpenTelemetry counter/histogram instruments.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
|
||||
# --- Step 1: Run FinBERT ---
|
||||
score, confidence = await finbert.analyze(article.title, article.content)
|
||||
model_used = "finbert"
|
||||
counters["finbert_count"].add(1)
|
||||
|
||||
# --- Step 2: Fallback to Ollama if confidence is too low ---
|
||||
if confidence < config.finbert_confidence_threshold:
|
||||
logger.info(
|
||||
"FinBERT confidence %.2f below threshold %.2f — falling back to Ollama",
|
||||
confidence,
|
||||
config.finbert_confidence_threshold,
|
||||
)
|
||||
score, confidence = await ollama.analyze(article.title, article.content)
|
||||
model_used = "ollama"
|
||||
counters["ollama_count"].add(1)
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
counters["inference_latency"].record(elapsed)
|
||||
|
||||
# --- Step 3: Extract tickers ---
|
||||
combined_text = f"{article.title} {article.content}"
|
||||
tickers = extract_tickers(combined_text)
|
||||
|
||||
if not tickers:
|
||||
logger.debug("No tickers found in article: %s", article.title[:80])
|
||||
# Still count the article as scored even if no tickers found.
|
||||
counters["articles_scored"].add(1)
|
||||
return
|
||||
|
||||
# --- Step 4: Publish one ScoredArticle per ticker ---
|
||||
for ticker in tickers:
|
||||
scored = ScoredArticle(
|
||||
source=article.source,
|
||||
url=article.url,
|
||||
title=article.title,
|
||||
content=article.content,
|
||||
published_at=article.published_at,
|
||||
fetched_at=article.fetched_at,
|
||||
content_hash=article.content_hash,
|
||||
ticker=ticker,
|
||||
sentiment_score=score,
|
||||
confidence=confidence,
|
||||
model_used=model_used,
|
||||
entities=tickers,
|
||||
)
|
||||
await publisher.publish(scored.model_dump(mode="json"))
|
||||
logger.debug("Published scored article for %s (score=%.2f)", ticker, score)
|
||||
|
||||
counters["articles_scored"].add(1)
|
||||
|
||||
|
||||
async def run(config: SentimentAnalyzerConfig | None = None) -> None:
|
||||
"""Main service loop.
|
||||
|
||||
Connects to Redis, initialises analysers and telemetry, then
|
||||
continuously consumes from ``news:raw`` and publishes to ``news:scored``.
|
||||
"""
|
||||
if config is None:
|
||||
config = SentimentAnalyzerConfig()
|
||||
|
||||
logging.basicConfig(level=config.log_level)
|
||||
logger.info("Starting Sentiment Analyzer service")
|
||||
|
||||
# --- Telemetry ---
|
||||
meter = setup_telemetry("sentiment-analyzer", config.otel_metrics_port)
|
||||
counters = {
|
||||
"articles_scored": meter.create_counter(
|
||||
"articles_scored",
|
||||
description="Total articles scored by the sentiment analyzer",
|
||||
),
|
||||
"finbert_count": meter.create_counter(
|
||||
"finbert_count",
|
||||
description="Number of articles scored by FinBERT",
|
||||
),
|
||||
"ollama_count": meter.create_counter(
|
||||
"ollama_count",
|
||||
description="Number of articles scored by Ollama (fallback)",
|
||||
),
|
||||
"inference_latency": meter.create_histogram(
|
||||
"inference_latency_seconds",
|
||||
description="Time spent on sentiment inference per article",
|
||||
unit="s",
|
||||
),
|
||||
}
|
||||
|
||||
# --- Redis ---
|
||||
redis = Redis.from_url(config.redis_url, decode_responses=False)
|
||||
consumer = StreamConsumer(redis, "news:raw", "sentiment-analyzer", "worker-1")
|
||||
publisher = StreamPublisher(redis, "news:scored")
|
||||
|
||||
# --- Analyzers ---
|
||||
finbert = FinBERTAnalyzer(
|
||||
model_name=config.finbert_model,
|
||||
max_content_length=config.max_content_length,
|
||||
)
|
||||
ollama = OllamaAnalyzer(model=config.ollama_model, host=config.ollama_host)
|
||||
|
||||
logger.info("Consuming from news:raw, publishing to news:scored")
|
||||
|
||||
# --- Consume loop ---
|
||||
async for _msg_id, data in consumer.consume():
|
||||
try:
|
||||
article = RawArticle.model_validate(data)
|
||||
await process_article(article, finbert, ollama, publisher, config, counters)
|
||||
except Exception:
|
||||
logger.exception("Error processing article: %s", data.get("title", "<unknown>"))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""CLI entry point."""
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue