From a0d099e62b313576d3c0af4b0b8c80d0aef649de Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Mon, 23 Jun 2025 19:43:54 +0000 Subject: [PATCH] expiry tasks after 10 minutes; also handle expired tasks --- crawler/api/app.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/crawler/api/app.py b/crawler/api/app.py index f348e5a..8c7db71 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -1,5 +1,6 @@ import asyncio import dataclasses +from datetime import datetime, timedelta import json import logging import logging.config @@ -24,6 +25,7 @@ from ui_exporter import export_immoweb from alembic import command from alembic.config import Config from contextlib import asynccontextmanager +from celery.exceptions import TaskRevokedError load_dotenv() logger = logging.getLogger("uvicorn") @@ -78,7 +80,11 @@ async def refresh_listings( query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: # TODO: rate limit - task = listing_tasks.dump_listings_task.delay(query_parameters.json()) + expiry_time = datetime.now() + timedelta(minutes=10) + task = listing_tasks.dump_listings_task.apply_async( + args=(query_parameters.model_dump_json(),), + expires=expiry_time, + ) return {"task_id": task.id} @@ -88,8 +94,12 @@ async def get_task_status( task_id: str, ) -> dict[str, str]: task_result = listing_tasks.dump_listings_task.AsyncResult(task_id) + result = task_result.result + if type(task_result.result) is TaskRevokedError: + result = str(task_result.result) + return { "task_id": task_id, "status": task_result.status, - "result": json.dumps(task_result.result), + "result": json.dumps(result), }