From e5ce8c12015bd8e6b71a0d3900409396d256c288 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 7 Feb 2026 23:34:08 +0000 Subject: [PATCH] Fix buy listing support: thread ListingType through processing pipeline The listing processor was hardcoded to create RentListing objects and query only the rentlisting table. Buy listings fetched from Rightmove were stored in the wrong table with missing fields. This threads ListingType through ListingProcessor and all Step subclasses so the correct model (RentListing/BuyListing) is created, the correct table is queried, and buy-specific fields (service_charge, lease_left) are parsed from the API response and included in GeoJSON streaming output. --- listing_processor.py | 110 ++++++++++++++++++++------- repositories/listing_repository.py | 8 +- services/listing_fetcher.py | 2 +- tasks/listing_tasks.py | 2 +- tests/unit/test_listing_processor.py | 4 +- ui_exporter.py | 31 ++++++-- 6 files changed, 116 insertions(+), 41 deletions(-) diff --git a/listing_processor.py b/listing_processor.py index f6394cf..b2390fb 100644 --- a/listing_processor.py +++ b/listing_processor.py @@ -6,9 +6,19 @@ from datetime import datetime import logging import multiprocessing from pathlib import Path +import re +from typing import Any from urllib.parse import urlparse import aiohttp -from models.listing import FurnishType, Listing, ListingSite, RentListing +from models.listing import ( + BuyListing, + FurnishType, + Listing, + ListingSite, + ListingType, + QueryParameters, + RentListing, +) from rec import floorplan from rec.query import detail_query from repositories.listing_repository import ListingRepository @@ -56,15 +66,16 @@ class ListingProcessor: "DetectFloorplanStep": "ocr", } - def __init__(self, listing_repository: ListingRepository): + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): self.semaphore = asyncio.Semaphore(20) self.listing_repository = listing_repository + self.listing_type = listing_type # Register new processing steps here # Order is important self.process_steps = [ - FetchListingDetailsStep(listing_repository), - FetchImagesStep(listing_repository), - DetectFloorplanStep(listing_repository), + FetchListingDetailsStep(listing_repository, listing_type), + FetchImagesStep(listing_repository, listing_type), + DetectFloorplanStep(listing_repository, listing_type), ] async def process_listing( @@ -72,7 +83,7 @@ class ListingProcessor: listing_id: int, on_step_complete: Callable[[str], None] | None = None, ) -> Listing | None: - await self.listing_repository.mark_seen(listing_id) + await self.listing_repository.mark_seen(listing_id, self.listing_type) listing = None for step in self.process_steps: if await step.needs_processing(listing_id): @@ -94,9 +105,15 @@ class ListingProcessor: class Step: listing_repository: ListingRepository + listing_type: ListingType - def __init__(self, listing_repository: ListingRepository): + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): self.listing_repository = listing_repository + self.listing_type = listing_type + + def _query_params(self) -> QueryParameters: + """Build minimal QueryParameters for ID-based lookups in the correct table.""" + return QueryParameters(listing_type=self.listing_type) @abstractmethod async def process(self, listing_id: int) -> Listing: ... @@ -106,10 +123,36 @@ class Step: return True +def _parse_service_charge(details: dict[str, Any]) -> float | None: + """Parse annual service charge from the tenure info in API response.""" + tenure_content = ( + details.get("property", {}).get("tenureInfo", {}).get("content", []) + ) + for item in tenure_content: + if item.get("type") == "annualServiceCharge": + matches = re.findall(r"([\d,.]+)", str(item.get("value", ""))) + if matches: + return float(matches[0].replace(",", "")) + return None + + +def _parse_lease_left(details: dict[str, Any]) -> int | None: + """Parse remaining lease years from the tenure info in API response.""" + tenure_content = ( + details.get("property", {}).get("tenureInfo", {}).get("content", []) + ) + for item in tenure_content: + if item.get("type") == "lengthOfLease": + matches = re.findall(r"(\d+)", str(item.get("value", ""))) + if matches: + return int(matches[0]) + return None + + class FetchListingDetailsStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: return True @@ -120,7 +163,7 @@ class FetchListingDetailsStep(Step): logger.info(f"[{listing_id}] Fetching details...") existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) now = datetime.now() if len(existing_listings) > 0: @@ -130,16 +173,8 @@ class FetchListingDetailsStep(Step): listing_details = await detail_query(listing_id) - furnish_type = _parse_furnish_type( - listing_details["property"].get("letFurnishType", "unknown") - ) - - available_from = _parse_available_from( - listing_details["property"]["letDateAvailable"] - ) - photos = listing_details["property"]["photos"] - listing = RentListing( # TODO: should pick based on price? + common_kwargs = dict( id=listing_id, price=listing_details["property"]["price"], number_of_bedrooms=listing_details["property"]["bedrooms"], @@ -154,10 +189,29 @@ class FetchListingDetailsStep(Step): listing_site=ListingSite.RIGHTMOVE, last_seen=now, photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, - furnish_type=furnish_type, - available_from=available_from, additional_info=listing_details, ) + + listing: Listing + if self.listing_type == ListingType.BUY: + listing = BuyListing( + **common_kwargs, + service_charge=_parse_service_charge(listing_details), + lease_left=_parse_lease_left(listing_details), + ) + else: + furnish_type = _parse_furnish_type( + listing_details["property"].get("letFurnishType", "unknown") + ) + available_from = _parse_available_from( + listing_details["property"]["letDateAvailable"] + ) + listing = RentListing( + **common_kwargs, + furnish_type=furnish_type, + available_from=available_from, + ) + await self.listing_repository.upsert_listings([listing]) logger.info( @@ -172,7 +226,7 @@ class FetchListingDetailsStep(Step): class FetchImagesStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: return False # if listing doesn't exist, we can't process it @@ -183,7 +237,7 @@ class FetchImagesStep(Step): logger.debug(f"[{listing_id}] Fetching floorplan images") existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: raise ValueError(f"Listing {listing_id} not found") @@ -228,12 +282,14 @@ class FetchImagesStep(Step): class DetectFloorplanStep(Step): ocr_semaphore: asyncio.Semaphore - def __init__(self, listing_repository: ListingRepository): - super().__init__(listing_repository) + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): + super().__init__(listing_repository, listing_type) self.ocr_semaphore = asyncio.Semaphore(MAX_OCR_WORKERS) async def needs_processing(self, listing_id: int) -> bool: - listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + listings = await self.listing_repository.get_listings( + only_ids=[listing_id], query_parameters=self._query_params() + ) if len(listings) == 0: return False return listings[0].square_meters is None @@ -241,7 +297,9 @@ class DetectFloorplanStep(Step): async def process(self, listing_id: int) -> Listing: logger.debug(f"[{listing_id}] Running OCR on floorplans") - listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + listings = await self.listing_repository.get_listings( + only_ids=[listing_id], query_parameters=self._query_params() + ) if len(listings) == 0: raise ValueError(f"Listing {listing_id} does not exist") listing = listings[0] diff --git a/repositories/listing_repository.py b/repositories/listing_repository.py index af5563b..d3300f1 100644 --- a/repositories/listing_repository.py +++ b/repositories/listing_repository.py @@ -20,7 +20,8 @@ logger = logging.getLogger("uvicorn.error") STREAMING_COLUMNS = [ 'id', 'price', 'number_of_bedrooms', 'square_meters', 'longitude', 'latitude', 'photo_thumbnail', 'last_seen', - 'agency', 'price_history_json', 'available_from' + 'agency', 'price_history_json', 'available_from', + 'service_charge', 'lease_left', ] @@ -348,8 +349,9 @@ class ListingRepository: result = session.execute(sa_select(model.id)) return {row[0] for row in result.fetchall()} - async def mark_seen(self, listing_id: int) -> None: - listings = await self.get_listings(only_ids=[listing_id]) + async def mark_seen(self, listing_id: int, listing_type: ListingType = ListingType.RENT) -> None: + query_params = QueryParameters(listing_type=listing_type) + listings = await self.get_listings(only_ids=[listing_id], query_parameters=query_params) if len(listings) == 0: return listing = listings[0] diff --git a/services/listing_fetcher.py b/services/listing_fetcher.py index aaa1f77..ee00aee 100644 --- a/services/listing_fetcher.py +++ b/services/listing_fetcher.py @@ -182,7 +182,7 @@ async def dump_listings( listing_id = await queue.get() if listing_id is None: break - listing_processor = ListingProcessor(repository) + listing_processor = ListingProcessor(repository, parameters.listing_type) listing = await listing_processor.process_listing(listing_id) if listing is not None: processed_listings.append(listing) diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py index ab5adf4..33c0628 100644 --- a/tasks/listing_tasks.py +++ b/tasks/listing_tasks.py @@ -397,7 +397,7 @@ async def _dump_listings_full_inner( }, ) - listing_processor = ListingProcessor(repository) + listing_processor = ListingProcessor(repository, parameters.listing_type) # Producer: fetch all subqueries concurrently, then signal workers to stop async def producer() -> None: diff --git a/tests/unit/test_listing_processor.py b/tests/unit/test_listing_processor.py index 47b9315..a515638 100644 --- a/tests/unit/test_listing_processor.py +++ b/tests/unit/test_listing_processor.py @@ -2,7 +2,7 @@ from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest -from models.listing import FurnishType +from models.listing import FurnishType, ListingType from listing_processor import ( _parse_furnish_type, _parse_available_from, @@ -69,7 +69,7 @@ class TestListingProcessor: for step in processor.process_steps: step.needs_processing = AsyncMock(return_value=False) await processor.process_listing(123) - mock_repo.mark_seen.assert_awaited_once_with(123) + mock_repo.mark_seen.assert_awaited_once_with(123, ListingType.RENT) async def test_process_listing_returns_none_on_step_failure(self): """Test that a step failure returns None.""" diff --git a/ui_exporter.py b/ui_exporter.py index 8636d9f..fed36d6 100644 --- a/ui_exporter.py +++ b/ui_exporter.py @@ -53,9 +53,7 @@ def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> d else: last_seen_str = str(last_seen_val) - return { - "type": "Feature", - "properties": { + properties: dict[str, Any] = { "listing_type": listing_type, "city": "London", "country": "United Kingdom", @@ -69,7 +67,16 @@ def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> d "price_history": price_history, "agency": row.get('agency'), "available_from": available_from_str, - }, + } + + if row.get('service_charge') is not None: + properties["service_charge"] = row['service_charge'] + if row.get('lease_left') is not None: + properties["lease_left"] = row['lease_left'] + + return { + "type": "Feature", + "properties": properties, "geometry": { "coordinates": [row['longitude'], row['latitude']], "type": "Point", @@ -90,9 +97,7 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A property_info = listing.additional_info.get("property", {}) if listing.additional_info else {} listing_type = "RENT" if isinstance(listing, RentListing) else "BUY" - return { - "type": "Feature", - "properties": { + properties: dict[str, Any] = { "listing_type": listing_type, "city": "London", # change me "country": "United Kingdom", @@ -106,7 +111,17 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A "price_history": [item.to_dict() for item in listing.price_history], "agency": listing.agency, "available_from": property_info.get("letDateAvailable", None), - }, + } + + if isinstance(listing, BuyListing): + if listing.service_charge is not None: + properties["service_charge"] = listing.service_charge + if listing.lease_left is not None: + properties["lease_left"] = listing.lease_left + + return { + "type": "Feature", + "properties": properties, "geometry": { "coordinates": [ listing.longitude,