feat: news fetcher service — RSS and Reddit sources
This commit is contained in:
parent
9f46071502
commit
90b52a5144
10 changed files with 722 additions and 2 deletions
0
services/__init__.py
Normal file
0
services/__init__.py
Normal file
1
services/news_fetcher/__init__.py
Normal file
1
services/news_fetcher/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
"""News fetcher service — polls RSS feeds and Reddit for financial news."""
|
||||
28
services/news_fetcher/config.py
Normal file
28
services/news_fetcher/config.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
"""Configuration for the news fetcher service."""
|
||||
|
||||
from shared.config import BaseConfig
|
||||
|
||||
|
||||
class NewsFetcherConfig(BaseConfig):
|
||||
"""News fetcher settings.
|
||||
|
||||
Extends :class:`BaseConfig` with RSS feed URLs, poll intervals,
|
||||
and Reddit API credentials. All settings can be overridden via
|
||||
environment variables prefixed with ``TRADING_``.
|
||||
"""
|
||||
|
||||
# RSS settings
|
||||
rss_feeds: list[str] = [
|
||||
"https://finance.yahoo.com/news/rssindex",
|
||||
"https://feeds.reuters.com/reuters/businessNews",
|
||||
"https://feeds.content.dowjones.io/public/rss/mw_topstories",
|
||||
]
|
||||
rss_poll_interval_seconds: int = 300
|
||||
|
||||
# Reddit settings
|
||||
reddit_subreddits: list[str] = ["wallstreetbets", "stocks", "investing"]
|
||||
reddit_poll_interval_seconds: int = 600
|
||||
reddit_min_score: int = 10
|
||||
reddit_client_id: str = ""
|
||||
reddit_client_secret: str = ""
|
||||
reddit_user_agent: str = "trading-bot/0.1"
|
||||
152
services/news_fetcher/main.py
Normal file
152
services/news_fetcher/main.py
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
"""News fetcher service entry point.
|
||||
|
||||
Polls RSS feeds and Reddit on independent schedules, deduplicates
|
||||
articles by content hash (via a Redis SET), and publishes new articles
|
||||
to the ``news:raw`` Redis Stream.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from redis.asyncio import Redis
|
||||
|
||||
from shared.redis_streams import StreamPublisher
|
||||
from shared.telemetry import setup_telemetry
|
||||
from services.news_fetcher.config import NewsFetcherConfig
|
||||
from services.news_fetcher.sources.rss import RSSSource
|
||||
from services.news_fetcher.sources.reddit import RedditSource
|
||||
from shared.schemas.news import RawArticle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SEEN_HASHES_KEY = "news:seen_hashes"
|
||||
NEWS_RAW_STREAM = "news:raw"
|
||||
|
||||
|
||||
async def _deduplicate_and_publish(
|
||||
articles: list[RawArticle],
|
||||
redis: Redis,
|
||||
publisher: StreamPublisher,
|
||||
articles_fetched_counter,
|
||||
fetch_errors_counter,
|
||||
) -> int:
|
||||
"""Add unseen articles to the ``news:raw`` stream.
|
||||
|
||||
Returns the number of newly published articles.
|
||||
"""
|
||||
published = 0
|
||||
for article in articles:
|
||||
# SADD returns 1 if the member was added (i.e. not already present)
|
||||
added = await redis.sadd(SEEN_HASHES_KEY, article.content_hash)
|
||||
if added:
|
||||
await publisher.publish(article.model_dump(mode="json"))
|
||||
published += 1
|
||||
if published:
|
||||
articles_fetched_counter.add(published)
|
||||
return published
|
||||
|
||||
|
||||
async def _poll_rss(
|
||||
source: RSSSource,
|
||||
interval: int,
|
||||
redis: Redis,
|
||||
publisher: StreamPublisher,
|
||||
articles_fetched_counter,
|
||||
fetch_errors_counter,
|
||||
) -> None:
|
||||
"""Continuously poll RSS feeds at *interval* seconds."""
|
||||
while True:
|
||||
try:
|
||||
logger.info("Polling RSS feeds …")
|
||||
articles = await source.fetch()
|
||||
count = await _deduplicate_and_publish(
|
||||
articles, redis, publisher, articles_fetched_counter, fetch_errors_counter
|
||||
)
|
||||
logger.info("RSS poll complete: %d new articles published", count)
|
||||
except Exception:
|
||||
logger.exception("RSS poll cycle failed")
|
||||
fetch_errors_counter.add(1)
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
async def _poll_reddit(
|
||||
source: RedditSource,
|
||||
interval: int,
|
||||
redis: Redis,
|
||||
publisher: StreamPublisher,
|
||||
articles_fetched_counter,
|
||||
fetch_errors_counter,
|
||||
) -> None:
|
||||
"""Continuously poll Reddit at *interval* seconds."""
|
||||
while True:
|
||||
try:
|
||||
logger.info("Polling Reddit …")
|
||||
articles = await source.fetch()
|
||||
count = await _deduplicate_and_publish(
|
||||
articles, redis, publisher, articles_fetched_counter, fetch_errors_counter
|
||||
)
|
||||
logger.info("Reddit poll complete: %d new articles published", count)
|
||||
except Exception:
|
||||
logger.exception("Reddit poll cycle failed")
|
||||
fetch_errors_counter.add(1)
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
"""Boot the news fetcher and start polling."""
|
||||
config = NewsFetcherConfig()
|
||||
|
||||
logging.basicConfig(level=config.log_level)
|
||||
logger.info("Starting news fetcher service")
|
||||
|
||||
# Telemetry
|
||||
meter = setup_telemetry("news-fetcher", config.otel_metrics_port)
|
||||
articles_fetched_counter = meter.create_counter(
|
||||
"news.articles_fetched",
|
||||
description="Total articles fetched and published",
|
||||
)
|
||||
fetch_errors_counter = meter.create_counter(
|
||||
"news.fetch_errors",
|
||||
description="Total fetch-cycle errors",
|
||||
)
|
||||
|
||||
# Redis
|
||||
redis = Redis.from_url(config.redis_url, decode_responses=True)
|
||||
publisher = StreamPublisher(redis, NEWS_RAW_STREAM)
|
||||
|
||||
# Sources
|
||||
rss_source = RSSSource(feeds=config.rss_feeds)
|
||||
reddit_source = RedditSource(
|
||||
subreddits=config.reddit_subreddits,
|
||||
client_id=config.reddit_client_id,
|
||||
client_secret=config.reddit_client_secret,
|
||||
user_agent=config.reddit_user_agent,
|
||||
min_score=config.reddit_min_score,
|
||||
)
|
||||
|
||||
# Run pollers concurrently
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(
|
||||
_poll_rss(
|
||||
rss_source,
|
||||
config.rss_poll_interval_seconds,
|
||||
redis,
|
||||
publisher,
|
||||
articles_fetched_counter,
|
||||
fetch_errors_counter,
|
||||
)
|
||||
)
|
||||
tg.create_task(
|
||||
_poll_reddit(
|
||||
reddit_source,
|
||||
config.reddit_poll_interval_seconds,
|
||||
redis,
|
||||
publisher,
|
||||
articles_fetched_counter,
|
||||
fetch_errors_counter,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
1
services/news_fetcher/sources/__init__.py
Normal file
1
services/news_fetcher/sources/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
"""News source adapters (RSS, Reddit)."""
|
||||
76
services/news_fetcher/sources/reddit.py
Normal file
76
services/news_fetcher/sources/reddit.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
"""Reddit source — fetches hot posts from financial subreddits via asyncpraw."""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from shared.schemas.news import RawArticle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedditSource:
|
||||
"""Fetches hot posts from Reddit and converts them to :class:`RawArticle`."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
subreddits: list[str],
|
||||
client_id: str,
|
||||
client_secret: str,
|
||||
user_agent: str,
|
||||
min_score: int = 10,
|
||||
) -> None:
|
||||
self.subreddits = subreddits
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.user_agent = user_agent
|
||||
self.min_score = min_score
|
||||
|
||||
async def fetch(self) -> list[RawArticle]:
|
||||
"""Return hot posts above *min_score* from each configured subreddit.
|
||||
|
||||
Uses ``asyncpraw`` so the caller must run within an ``async`` context.
|
||||
Each Reddit instance is created and closed within this call to avoid
|
||||
leaking sessions across poll cycles.
|
||||
"""
|
||||
import asyncpraw # lazy import so the dep is optional at import time
|
||||
|
||||
articles: list[RawArticle] = []
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
reddit = asyncpraw.Reddit(
|
||||
client_id=self.client_id,
|
||||
client_secret=self.client_secret,
|
||||
user_agent=self.user_agent,
|
||||
)
|
||||
try:
|
||||
for sub_name in self.subreddits:
|
||||
try:
|
||||
subreddit = await reddit.subreddit(sub_name)
|
||||
async for post in subreddit.hot(limit=25):
|
||||
if post.score < self.min_score:
|
||||
continue
|
||||
|
||||
content = post.selftext if post.selftext else post.url
|
||||
permalink = post.permalink
|
||||
content_hash = hashlib.sha256(permalink.encode()).hexdigest()
|
||||
published_at = datetime.fromtimestamp(post.created_utc, tz=timezone.utc)
|
||||
|
||||
articles.append(
|
||||
RawArticle(
|
||||
source="reddit",
|
||||
url=f"https://reddit.com{permalink}",
|
||||
title=post.title,
|
||||
content=content,
|
||||
published_at=published_at,
|
||||
fetched_at=now,
|
||||
content_hash=content_hash,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to fetch subreddit r/%s", sub_name)
|
||||
continue
|
||||
finally:
|
||||
await reddit.close()
|
||||
|
||||
return articles
|
||||
71
services/news_fetcher/sources/rss.py
Normal file
71
services/news_fetcher/sources/rss.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
"""RSS feed source — fetches articles from configurable RSS feed URLs."""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from email.utils import parsedate_to_datetime
|
||||
|
||||
import feedparser
|
||||
|
||||
from shared.schemas.news import RawArticle
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RSSSource:
|
||||
"""Fetches and converts RSS feed entries to :class:`RawArticle` instances."""
|
||||
|
||||
def __init__(self, feeds: list[str]) -> None:
|
||||
self.feeds = feeds
|
||||
|
||||
async def fetch(self) -> list[RawArticle]:
|
||||
"""Parse every configured feed and return a list of raw articles.
|
||||
|
||||
Feeds that fail to parse are logged and skipped so that a single
|
||||
broken feed does not prevent the others from being collected.
|
||||
"""
|
||||
articles: list[RawArticle] = []
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
for feed_url in self.feeds:
|
||||
try:
|
||||
parsed = feedparser.parse(feed_url)
|
||||
if parsed.bozo and not parsed.entries:
|
||||
logger.warning("Feed %s returned bozo error: %s", feed_url, parsed.bozo_exception)
|
||||
continue
|
||||
|
||||
for entry in parsed.entries:
|
||||
title = entry.get("title", "")
|
||||
link = entry.get("link", "")
|
||||
content = entry.get("summary", "") or entry.get("description", "")
|
||||
|
||||
published_at = self._parse_published(entry)
|
||||
content_hash = hashlib.sha256(f"{link}{title}".encode()).hexdigest()
|
||||
|
||||
articles.append(
|
||||
RawArticle(
|
||||
source="rss",
|
||||
url=link,
|
||||
title=title,
|
||||
content=content,
|
||||
published_at=published_at,
|
||||
fetched_at=now,
|
||||
content_hash=content_hash,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to fetch RSS feed %s", feed_url)
|
||||
continue
|
||||
|
||||
return articles
|
||||
|
||||
@staticmethod
|
||||
def _parse_published(entry: dict) -> datetime | None:
|
||||
"""Best-effort parsing of the entry's publication date."""
|
||||
raw = entry.get("published") or entry.get("updated")
|
||||
if not raw:
|
||||
return None
|
||||
try:
|
||||
return parsedate_to_datetime(raw)
|
||||
except Exception:
|
||||
return None
|
||||
Loading…
Add table
Add a link
Reference in a new issue