trading/services/sentiment_analyzer/main.py
Viktor Barzin 99b041a0dd
fix: flush article before creating sentiment FK references
ArticleSentiment referenced db_article.id before the Article was flushed,
causing a NotNullViolationError on the article_id column. Adding an explicit
flush after session.add(db_article) ensures the UUID is populated before
creating the foreign key reference.
2026-02-23 20:57:11 +00:00

240 lines
8.3 KiB
Python

"""Sentiment Analyzer service — main entry point.
Consumes ``news:raw`` articles from Redis Streams, scores them using a
tiered approach (FinBERT first, Ollama fallback for low-confidence results),
extracts ticker mentions, and publishes ``ScoredArticle`` messages to
``news:scored``. Also persists scored articles to the database (articles +
article_sentiments tables) so the dashboard can display real data.
"""
from __future__ import annotations
import asyncio
import logging
import signal
import time
from redis.asyncio import Redis
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import async_sessionmaker
from services.sentiment_analyzer.analyzers.finbert import FinBERTAnalyzer
from services.sentiment_analyzer.analyzers.ollama_analyzer import OllamaAnalyzer
from services.sentiment_analyzer.config import SentimentAnalyzerConfig
from services.sentiment_analyzer.ticker_extractor import extract_tickers
from shared.db import create_db
from shared.models.news import Article, ArticleSentiment
from shared.redis_streams import StreamConsumer, StreamPublisher
from shared.schemas.news import RawArticle, ScoredArticle
from shared.telemetry import setup_telemetry
logger = logging.getLogger(__name__)
async def process_article(
article: RawArticle,
finbert: FinBERTAnalyzer,
ollama: OllamaAnalyzer,
publisher: StreamPublisher,
config: SentimentAnalyzerConfig,
counters: dict,
db_session_factory: async_sessionmaker | None = None,
) -> None:
"""Score a single article and publish one ScoredArticle per extracted ticker.
Parameters
----------
article:
The raw article consumed from the ``news:raw`` stream.
finbert:
FinBERT analyzer instance.
ollama:
Ollama analyzer instance (used as fallback).
publisher:
Publishes results to ``news:scored``.
config:
Service configuration (confidence threshold, etc.).
counters:
Dict of OpenTelemetry counter/histogram instruments.
db_session_factory:
Optional async session factory for persisting to the DB.
When ``None``, DB persistence is skipped (backward compatible).
"""
start = time.monotonic()
# --- Step 1: Run FinBERT ---
score, confidence = await finbert.analyze(article.title, article.content)
model_used = "finbert"
counters["finbert_count"].add(1)
# --- Step 2: Fallback to Ollama if confidence is too low ---
if confidence < config.finbert_confidence_threshold:
logger.info(
"FinBERT confidence %.2f below threshold %.2f — falling back to Ollama",
confidence,
config.finbert_confidence_threshold,
)
score, confidence = await ollama.analyze(article.title, article.content)
model_used = "ollama"
counters["ollama_count"].add(1)
elapsed = time.monotonic() - start
counters["inference_latency"].record(elapsed)
# --- Step 3: Extract tickers ---
combined_text = f"{article.title} {article.content}"
tickers = extract_tickers(combined_text)
if not tickers:
logger.debug("No tickers found in article: %s", article.title[:80])
# Still count the article as scored even if no tickers found.
counters["articles_scored"].add(1)
return
# --- Step 4: Publish one ScoredArticle per ticker ---
for ticker in tickers:
scored = ScoredArticle(
source=article.source,
url=article.url,
title=article.title,
content=article.content,
published_at=article.published_at,
fetched_at=article.fetched_at,
content_hash=article.content_hash,
ticker=ticker,
sentiment_score=score,
confidence=confidence,
model_used=model_used,
entities=tickers,
)
await publisher.publish(scored.model_dump(mode="json"))
logger.debug("Published scored article for %s (score=%.2f)", ticker, score)
counters["articles_scored"].add(1)
# --- Step 5: Persist to DB ---
if db_session_factory is not None:
try:
async with db_session_factory() as session:
db_article = Article(
source=article.source,
url=article.url,
title=article.title,
published_at=article.published_at,
fetched_at=article.fetched_at,
content_hash=article.content_hash,
)
session.add(db_article)
await session.flush()
for ticker in tickers:
sentiment = ArticleSentiment(
article_id=db_article.id,
ticker=ticker,
score=score,
confidence=confidence,
model_used=model_used,
)
session.add(sentiment)
await session.commit()
logger.debug(
"Persisted article '%s' with %d sentiments to DB",
article.title[:60],
len(tickers),
)
except IntegrityError:
logger.debug(
"Article already exists in DB (content_hash=%s), skipping",
article.content_hash,
)
except Exception:
logger.exception(
"Failed to persist article to DB: %s", article.title[:60]
)
async def run(config: SentimentAnalyzerConfig | None = None) -> None:
"""Main service loop.
Connects to Redis, initialises analysers and telemetry, then
continuously consumes from ``news:raw`` and publishes to ``news:scored``.
"""
if config is None:
config = SentimentAnalyzerConfig()
logging.basicConfig(level=config.log_level)
logger.info("Starting Sentiment Analyzer service")
# --- Telemetry ---
meter = setup_telemetry("sentiment-analyzer", config.otel_metrics_port)
counters = {
"articles_scored": meter.create_counter(
"articles_scored",
description="Total articles scored by the sentiment analyzer",
),
"finbert_count": meter.create_counter(
"finbert_count",
description="Number of articles scored by FinBERT",
),
"ollama_count": meter.create_counter(
"ollama_count",
description="Number of articles scored by Ollama (fallback)",
),
"inference_latency": meter.create_histogram(
"inference_latency_seconds",
description="Time spent on sentiment inference per article",
unit="s",
),
}
# --- Redis ---
redis = Redis.from_url(config.redis_url, decode_responses=False)
consumer = StreamConsumer(redis, "news:raw", "sentiment-analyzer", "worker-1")
publisher = StreamPublisher(redis, "news:scored")
# --- Analyzers ---
finbert = FinBERTAnalyzer(
model_name=config.finbert_model,
max_content_length=config.max_content_length,
)
ollama = OllamaAnalyzer(model=config.ollama_model, host=config.ollama_host)
# --- Database ---
db_session_factory = None
try:
_engine, db_session_factory = create_db(config)
logger.info("Database session factory initialised")
except Exception:
logger.exception("Failed to initialise DB — articles will NOT be persisted")
logger.info("Consuming from news:raw, publishing to news:scored")
# Graceful shutdown on SIGTERM/SIGINT
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_event.set)
# --- Consume loop ---
try:
async for _msg_id, data in consumer.consume():
if shutdown_event.is_set():
break
try:
article = RawArticle.model_validate(data)
await process_article(article, finbert, ollama, publisher, config, counters, db_session_factory)
except Exception:
logger.exception("Error processing article: %s", data.get("title", "<unknown>"))
finally:
await redis.aclose()
logger.info("Sentiment analyzer stopped gracefully")
def main() -> None:
"""CLI entry point."""
asyncio.run(run())
if __name__ == "__main__":
main()