expiry tasks after 10 minutes; also handle expired tasks

This commit is contained in:
Viktor Barzin 2025-06-23 19:43:54 +00:00
parent 7cea586f41
commit a0d099e62b
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863

View file

@ -1,5 +1,6 @@
import asyncio
import dataclasses
from datetime import datetime, timedelta
import json
import logging
import logging.config
@ -24,6 +25,7 @@ from ui_exporter import export_immoweb
from alembic import command
from alembic.config import Config
from contextlib import asynccontextmanager
from celery.exceptions import TaskRevokedError
load_dotenv()
logger = logging.getLogger("uvicorn")
@ -78,7 +80,11 @@ async def refresh_listings(
query_parameters: Annotated[QueryParameters, Query()],
) -> dict[str, str]:
# TODO: rate limit
task = listing_tasks.dump_listings_task.delay(query_parameters.json())
expiry_time = datetime.now() + timedelta(minutes=10)
task = listing_tasks.dump_listings_task.apply_async(
args=(query_parameters.model_dump_json(),),
expires=expiry_time,
)
return {"task_id": task.id}
@ -88,8 +94,12 @@ async def get_task_status(
task_id: str,
) -> dict[str, str]:
task_result = listing_tasks.dump_listings_task.AsyncResult(task_id)
result = task_result.result
if type(task_result.result) is TaskRevokedError:
result = str(task_result.result)
return {
"task_id": task_id,
"status": task_result.status,
"result": json.dumps(task_result.result),
"result": json.dumps(result),
}