From 01a940b9b61867da4d7b43ce89b6faea7c107ad5 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 16 May 2026 12:53:26 +0000 Subject: [PATCH] wrongmove: fix DetachedInstanceError in daily market aggregator task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `compute_aggregate_snapshot` returns ORM rows that were created inside a `with Session(engine)` block — by the time the Celery task tries to serialise their attributes into the result dict the session has closed, triggering SQLAlchemy's DetachedInstanceError. Combined with acks_late this caused the task to be redelivered repeatedly (4× in the first manual trigger). Fix: drop the per-row dict-serialisation in the task return — keep just `aggregates_written: int`. The per-band stats are already logged by the aggregator's own info-level lines, so no observability is lost. Caught when manually firing the task on prod to seed today's snapshot before the 04:00 UTC daily fire. Aggregator itself ran fine (the rows were written before the session closed); only the post-return access was broken. Co-Authored-By: Claude Opus 4.7 --- tasks/market_tasks.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/tasks/market_tasks.py b/tasks/market_tasks.py index e57fd79..12c1da8 100644 --- a/tasks/market_tasks.py +++ b/tasks/market_tasks.py @@ -32,26 +32,20 @@ def compute_daily_market_aggregates_task(self: Any) -> dict[str, Any]: celery_logger.info("Starting daily market aggregator (task=%s)", self.request.id) per_listing = market_aggregator.update_per_listing_trend(engine) aggregates = market_aggregator.compute_aggregate_snapshot(engine) + # Materialise only the count — the row objects came from a session + # that's already closed, so accessing any lazy-loaded attribute would + # raise DetachedInstanceError. The aggregator's own logger lines have + # already printed the per-band stats. + aggregates_count = len(aggregates) result = { "status": "ok", "per_listing": per_listing, - "aggregates": [ - { - "snapshot_date": a.snapshot_date.isoformat(), - "listing_type": a.listing_type, - "min_bedrooms": a.min_bedrooms, - "max_bedrooms": a.max_bedrooms, - "listing_count": a.listing_count, - "median_total_price": a.median_total_price, - "median_qmprice": a.median_qmprice, - } - for a in aggregates - ], + "aggregates_written": aggregates_count, } celery_logger.info( - "Daily market aggregator complete: rent_updated=%s buy_updated=%s aggregates=%s", + "Daily market aggregator complete: rent_updated=%s buy_updated=%s aggregates=%d", per_listing.get("rent_updated"), per_listing.get("buy_updated"), - len(aggregates), + aggregates_count, ) return result