Add comprehensive logging to Celery tasks and listing processor
This commit is contained in:
parent
f880664a98
commit
c4b11ccfe9
2 changed files with 269 additions and 107 deletions
|
|
@ -13,6 +13,9 @@ from repositories.listing_repository import ListingRepository
|
|||
|
||||
logger = logging.getLogger("uvicorn.error")
|
||||
|
||||
# Also use celery task logger for visibility in worker output
|
||||
celery_logger = logging.getLogger("celery.task")
|
||||
|
||||
|
||||
class ListingProcessor:
|
||||
semaphore: asyncio.Semaphore
|
||||
|
|
@ -36,15 +39,16 @@ class ListingProcessor:
|
|||
for step in self.process_steps:
|
||||
if await step.needs_processing(listing_id):
|
||||
async with self.semaphore:
|
||||
step_name = step.__class__.__name__
|
||||
try:
|
||||
listing = await step.process(listing_id)
|
||||
logger.debug(f"[{listing_id}] {step_name} completed")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process {listing_id=}: {e}")
|
||||
logger.error(f"[{listing_id}] {step_name} failed: {e}")
|
||||
celery_logger.error(f"[{listing_id}] {step_name} failed: {e}")
|
||||
return None
|
||||
return listing
|
||||
|
||||
async def listing_exists(self, listing_id: int) -> bool: ...
|
||||
|
||||
|
||||
class Step:
|
||||
listing_repository: ListingRepository
|
||||
|
|
@ -65,19 +69,23 @@ class FetchListingDetailsStep(Step):
|
|||
existing_listings = await self.listing_repository.get_listings(
|
||||
only_ids=[listing_id]
|
||||
)
|
||||
if (existing_listings) == 0:
|
||||
if len(existing_listings) == 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def process(self, listing_id: int) -> Listing:
|
||||
logger.debug(f"Fetching details for {listing_id=}")
|
||||
logger.debug(f"[{listing_id}] Fetching property details from API")
|
||||
celery_logger.info(f"[{listing_id}] Fetching details...")
|
||||
|
||||
existing_listings = await self.listing_repository.get_listings(
|
||||
only_ids=[listing_id]
|
||||
)
|
||||
now = datetime.now()
|
||||
if len(existing_listings) > 0:
|
||||
# listing exists, do not refresh
|
||||
logger.debug(f"[{listing_id}] Already exists, skipping refresh")
|
||||
return existing_listings[0]
|
||||
|
||||
listing_details = await detail_query(listing_id)
|
||||
|
||||
furnish_type_str = listing_details["property"].get("letFurnishType", "unknown")
|
||||
|
|
@ -124,7 +132,12 @@ class FetchListingDetailsStep(Step):
|
|||
additional_info=listing_details,
|
||||
)
|
||||
await self.listing_repository.upsert_listings([listing])
|
||||
logger.debug(f"Completed fetching details for {listing_id=}")
|
||||
|
||||
celery_logger.info(
|
||||
f"[{listing_id}] Details fetched: £{listing.price}, "
|
||||
f"{listing.number_of_bedrooms}BR, {listing.agency}"
|
||||
)
|
||||
logger.debug(f"[{listing_id}] Details fetch complete")
|
||||
# TODO: dump to filesystem
|
||||
return listing
|
||||
|
||||
|
|
@ -140,7 +153,8 @@ class FetchImagesStep(Step):
|
|||
return len(listing.floorplan_image_paths) == 0
|
||||
|
||||
async def process(self, listing_id: int) -> Listing:
|
||||
logger.debug(f"Fetching images for {listing_id=}")
|
||||
logger.debug(f"[{listing_id}] Fetching floorplan images")
|
||||
|
||||
existing_listings = await self.listing_repository.get_listings(
|
||||
only_ids=[listing_id]
|
||||
)
|
||||
|
|
@ -152,6 +166,12 @@ class FetchImagesStep(Step):
|
|||
all_floorplans = listing.additional_info.get("property", {}).get(
|
||||
"floorplans", []
|
||||
)
|
||||
|
||||
if len(all_floorplans) == 0:
|
||||
logger.debug(f"[{listing_id}] No floorplans available")
|
||||
return listing
|
||||
|
||||
downloaded = 0
|
||||
client_timeout = aiohttp.ClientTimeout(total=30)
|
||||
for floorplan_obj in all_floorplans:
|
||||
url = floorplan_obj["url"]
|
||||
|
|
@ -169,8 +189,12 @@ class FetchImagesStep(Step):
|
|||
with open(floorplan_path, "wb") as f:
|
||||
f.write(await response.read())
|
||||
listing.floorplan_image_paths.append(str(floorplan_path))
|
||||
downloaded += 1
|
||||
|
||||
await self.listing_repository.upsert_listings([listing])
|
||||
logger.debug(f"Completed fetching images for {listing_id=}")
|
||||
|
||||
celery_logger.info(f"[{listing_id}] Downloaded {downloaded} floorplan images")
|
||||
logger.debug(f"[{listing_id}] Image fetch complete")
|
||||
return listing
|
||||
|
||||
|
||||
|
|
@ -188,11 +212,19 @@ class DetectFloorplanStep(Step):
|
|||
return listings[0].square_meters is None
|
||||
|
||||
async def process(self, listing_id: int) -> Listing:
|
||||
logger.debug(f"Running floorplan detection for {listing_id=}")
|
||||
logger.debug(f"[{listing_id}] Running OCR on floorplans")
|
||||
|
||||
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
|
||||
if len(listings) == 0:
|
||||
raise ValueError(f"Listing {listing_id} does not exist")
|
||||
listing = listings[0]
|
||||
|
||||
if len(listing.floorplan_image_paths) == 0:
|
||||
logger.debug(f"[{listing_id}] No floorplan images to process")
|
||||
listing.square_meters = 0
|
||||
await self.listing_repository.upsert_listings([listing])
|
||||
return listing
|
||||
|
||||
sqms = []
|
||||
for floorplan_path in listing.floorplan_image_paths:
|
||||
async with self.ocr_semaphore:
|
||||
|
|
@ -201,9 +233,15 @@ class DetectFloorplanStep(Step):
|
|||
)
|
||||
if estimated_sqm is not None:
|
||||
sqms.append(estimated_sqm)
|
||||
|
||||
max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0
|
||||
# if max_sqm is not None:
|
||||
listing.square_meters = max_sqm
|
||||
await self.listing_repository.upsert_listings([listing])
|
||||
logger.debug(f"Completed running floorplan detection for {listing_id=}")
|
||||
|
||||
if max_sqm > 0:
|
||||
celery_logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm")
|
||||
else:
|
||||
logger.debug(f"[{listing_id}] OCR: no square meters detected")
|
||||
|
||||
logger.debug(f"[{listing_id}] OCR complete")
|
||||
return listing
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue