`compute_aggregate_snapshot` returns ORM rows that were created inside a `with Session(engine)` block — by the time the Celery task tries to serialise their attributes into the result dict the session has closed, triggering SQLAlchemy's DetachedInstanceError. Combined with acks_late this caused the task to be redelivered repeatedly (4× in the first manual trigger). Fix: drop the per-row dict-serialisation in the task return — keep just `aggregates_written: int`. The per-band stats are already logged by the aggregator's own info-level lines, so no observability is lost. Caught when manually firing the task on prod to seed today's snapshot before the 04:00 UTC daily fire. Aggregator itself ran fine (the rows were written before the session closed); only the post-return access was broken. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
51 lines
1.8 KiB
Python
51 lines
1.8 KiB
Python
"""Daily market-trend aggregator Celery task.
|
|
|
|
Fires daily at 04:00 UTC — one hour after the 03:00 RENT scrape so the
|
|
data is fresh. Calls into `services.market_aggregator` to:
|
|
1. Recompute per-listing `price_14d_ago` / `price_change_pct_14d`.
|
|
2. Upsert the per-(listing_type, bedroom-band) row in
|
|
`dailylistingaggregate` for today's snapshot.
|
|
|
|
Idempotent: re-running on the same day refreshes both surfaces in place.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from celery_app import app
|
|
from database import engine
|
|
from services import market_aggregator
|
|
|
|
celery_logger = logging.getLogger("celery_app")
|
|
|
|
|
|
@app.task(
|
|
bind=True,
|
|
name="tasks.market_tasks.compute_daily_market_aggregates_task",
|
|
time_limit=3600,
|
|
soft_time_limit=3500,
|
|
acks_late=True,
|
|
)
|
|
def compute_daily_market_aggregates_task(self: Any) -> dict[str, Any]:
|
|
"""Run both stages of the daily market aggregator."""
|
|
celery_logger.info("Starting daily market aggregator (task=%s)", self.request.id)
|
|
per_listing = market_aggregator.update_per_listing_trend(engine)
|
|
aggregates = market_aggregator.compute_aggregate_snapshot(engine)
|
|
# Materialise only the count — the row objects came from a session
|
|
# that's already closed, so accessing any lazy-loaded attribute would
|
|
# raise DetachedInstanceError. The aggregator's own logger lines have
|
|
# already printed the per-band stats.
|
|
aggregates_count = len(aggregates)
|
|
result = {
|
|
"status": "ok",
|
|
"per_listing": per_listing,
|
|
"aggregates_written": aggregates_count,
|
|
}
|
|
celery_logger.info(
|
|
"Daily market aggregator complete: rent_updated=%s buy_updated=%s aggregates=%d",
|
|
per_listing.get("rent_updated"),
|
|
per_listing.get("buy_updated"),
|
|
aggregates_count,
|
|
)
|
|
return result
|