diff --git a/listing_processor.py b/listing_processor.py index f6394cf..b2390fb 100644 --- a/listing_processor.py +++ b/listing_processor.py @@ -6,9 +6,19 @@ from datetime import datetime import logging import multiprocessing from pathlib import Path +import re +from typing import Any from urllib.parse import urlparse import aiohttp -from models.listing import FurnishType, Listing, ListingSite, RentListing +from models.listing import ( + BuyListing, + FurnishType, + Listing, + ListingSite, + ListingType, + QueryParameters, + RentListing, +) from rec import floorplan from rec.query import detail_query from repositories.listing_repository import ListingRepository @@ -56,15 +66,16 @@ class ListingProcessor: "DetectFloorplanStep": "ocr", } - def __init__(self, listing_repository: ListingRepository): + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): self.semaphore = asyncio.Semaphore(20) self.listing_repository = listing_repository + self.listing_type = listing_type # Register new processing steps here # Order is important self.process_steps = [ - FetchListingDetailsStep(listing_repository), - FetchImagesStep(listing_repository), - DetectFloorplanStep(listing_repository), + FetchListingDetailsStep(listing_repository, listing_type), + FetchImagesStep(listing_repository, listing_type), + DetectFloorplanStep(listing_repository, listing_type), ] async def process_listing( @@ -72,7 +83,7 @@ class ListingProcessor: listing_id: int, on_step_complete: Callable[[str], None] | None = None, ) -> Listing | None: - await self.listing_repository.mark_seen(listing_id) + await self.listing_repository.mark_seen(listing_id, self.listing_type) listing = None for step in self.process_steps: if await step.needs_processing(listing_id): @@ -94,9 +105,15 @@ class ListingProcessor: class Step: listing_repository: ListingRepository + listing_type: ListingType - def __init__(self, listing_repository: ListingRepository): + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): self.listing_repository = listing_repository + self.listing_type = listing_type + + def _query_params(self) -> QueryParameters: + """Build minimal QueryParameters for ID-based lookups in the correct table.""" + return QueryParameters(listing_type=self.listing_type) @abstractmethod async def process(self, listing_id: int) -> Listing: ... @@ -106,10 +123,36 @@ class Step: return True +def _parse_service_charge(details: dict[str, Any]) -> float | None: + """Parse annual service charge from the tenure info in API response.""" + tenure_content = ( + details.get("property", {}).get("tenureInfo", {}).get("content", []) + ) + for item in tenure_content: + if item.get("type") == "annualServiceCharge": + matches = re.findall(r"([\d,.]+)", str(item.get("value", ""))) + if matches: + return float(matches[0].replace(",", "")) + return None + + +def _parse_lease_left(details: dict[str, Any]) -> int | None: + """Parse remaining lease years from the tenure info in API response.""" + tenure_content = ( + details.get("property", {}).get("tenureInfo", {}).get("content", []) + ) + for item in tenure_content: + if item.get("type") == "lengthOfLease": + matches = re.findall(r"(\d+)", str(item.get("value", ""))) + if matches: + return int(matches[0]) + return None + + class FetchListingDetailsStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: return True @@ -120,7 +163,7 @@ class FetchListingDetailsStep(Step): logger.info(f"[{listing_id}] Fetching details...") existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) now = datetime.now() if len(existing_listings) > 0: @@ -130,16 +173,8 @@ class FetchListingDetailsStep(Step): listing_details = await detail_query(listing_id) - furnish_type = _parse_furnish_type( - listing_details["property"].get("letFurnishType", "unknown") - ) - - available_from = _parse_available_from( - listing_details["property"]["letDateAvailable"] - ) - photos = listing_details["property"]["photos"] - listing = RentListing( # TODO: should pick based on price? + common_kwargs = dict( id=listing_id, price=listing_details["property"]["price"], number_of_bedrooms=listing_details["property"]["bedrooms"], @@ -154,10 +189,29 @@ class FetchListingDetailsStep(Step): listing_site=ListingSite.RIGHTMOVE, last_seen=now, photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, - furnish_type=furnish_type, - available_from=available_from, additional_info=listing_details, ) + + listing: Listing + if self.listing_type == ListingType.BUY: + listing = BuyListing( + **common_kwargs, + service_charge=_parse_service_charge(listing_details), + lease_left=_parse_lease_left(listing_details), + ) + else: + furnish_type = _parse_furnish_type( + listing_details["property"].get("letFurnishType", "unknown") + ) + available_from = _parse_available_from( + listing_details["property"]["letDateAvailable"] + ) + listing = RentListing( + **common_kwargs, + furnish_type=furnish_type, + available_from=available_from, + ) + await self.listing_repository.upsert_listings([listing]) logger.info( @@ -172,7 +226,7 @@ class FetchListingDetailsStep(Step): class FetchImagesStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: return False # if listing doesn't exist, we can't process it @@ -183,7 +237,7 @@ class FetchImagesStep(Step): logger.debug(f"[{listing_id}] Fetching floorplan images") existing_listings = await self.listing_repository.get_listings( - only_ids=[listing_id] + only_ids=[listing_id], query_parameters=self._query_params() ) if len(existing_listings) == 0: raise ValueError(f"Listing {listing_id} not found") @@ -228,12 +282,14 @@ class FetchImagesStep(Step): class DetectFloorplanStep(Step): ocr_semaphore: asyncio.Semaphore - def __init__(self, listing_repository: ListingRepository): - super().__init__(listing_repository) + def __init__(self, listing_repository: ListingRepository, listing_type: ListingType = ListingType.RENT): + super().__init__(listing_repository, listing_type) self.ocr_semaphore = asyncio.Semaphore(MAX_OCR_WORKERS) async def needs_processing(self, listing_id: int) -> bool: - listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + listings = await self.listing_repository.get_listings( + only_ids=[listing_id], query_parameters=self._query_params() + ) if len(listings) == 0: return False return listings[0].square_meters is None @@ -241,7 +297,9 @@ class DetectFloorplanStep(Step): async def process(self, listing_id: int) -> Listing: logger.debug(f"[{listing_id}] Running OCR on floorplans") - listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + listings = await self.listing_repository.get_listings( + only_ids=[listing_id], query_parameters=self._query_params() + ) if len(listings) == 0: raise ValueError(f"Listing {listing_id} does not exist") listing = listings[0] diff --git a/repositories/listing_repository.py b/repositories/listing_repository.py index af5563b..d3300f1 100644 --- a/repositories/listing_repository.py +++ b/repositories/listing_repository.py @@ -20,7 +20,8 @@ logger = logging.getLogger("uvicorn.error") STREAMING_COLUMNS = [ 'id', 'price', 'number_of_bedrooms', 'square_meters', 'longitude', 'latitude', 'photo_thumbnail', 'last_seen', - 'agency', 'price_history_json', 'available_from' + 'agency', 'price_history_json', 'available_from', + 'service_charge', 'lease_left', ] @@ -348,8 +349,9 @@ class ListingRepository: result = session.execute(sa_select(model.id)) return {row[0] for row in result.fetchall()} - async def mark_seen(self, listing_id: int) -> None: - listings = await self.get_listings(only_ids=[listing_id]) + async def mark_seen(self, listing_id: int, listing_type: ListingType = ListingType.RENT) -> None: + query_params = QueryParameters(listing_type=listing_type) + listings = await self.get_listings(only_ids=[listing_id], query_parameters=query_params) if len(listings) == 0: return listing = listings[0] diff --git a/services/listing_fetcher.py b/services/listing_fetcher.py index aaa1f77..ee00aee 100644 --- a/services/listing_fetcher.py +++ b/services/listing_fetcher.py @@ -182,7 +182,7 @@ async def dump_listings( listing_id = await queue.get() if listing_id is None: break - listing_processor = ListingProcessor(repository) + listing_processor = ListingProcessor(repository, parameters.listing_type) listing = await listing_processor.process_listing(listing_id) if listing is not None: processed_listings.append(listing) diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py index ab5adf4..33c0628 100644 --- a/tasks/listing_tasks.py +++ b/tasks/listing_tasks.py @@ -397,7 +397,7 @@ async def _dump_listings_full_inner( }, ) - listing_processor = ListingProcessor(repository) + listing_processor = ListingProcessor(repository, parameters.listing_type) # Producer: fetch all subqueries concurrently, then signal workers to stop async def producer() -> None: diff --git a/tests/unit/test_listing_processor.py b/tests/unit/test_listing_processor.py index 47b9315..a515638 100644 --- a/tests/unit/test_listing_processor.py +++ b/tests/unit/test_listing_processor.py @@ -2,7 +2,7 @@ from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest -from models.listing import FurnishType +from models.listing import FurnishType, ListingType from listing_processor import ( _parse_furnish_type, _parse_available_from, @@ -69,7 +69,7 @@ class TestListingProcessor: for step in processor.process_steps: step.needs_processing = AsyncMock(return_value=False) await processor.process_listing(123) - mock_repo.mark_seen.assert_awaited_once_with(123) + mock_repo.mark_seen.assert_awaited_once_with(123, ListingType.RENT) async def test_process_listing_returns_none_on_step_failure(self): """Test that a step failure returns None.""" diff --git a/ui_exporter.py b/ui_exporter.py index 8636d9f..fed36d6 100644 --- a/ui_exporter.py +++ b/ui_exporter.py @@ -53,9 +53,7 @@ def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> d else: last_seen_str = str(last_seen_val) - return { - "type": "Feature", - "properties": { + properties: dict[str, Any] = { "listing_type": listing_type, "city": "London", "country": "United Kingdom", @@ -69,7 +67,16 @@ def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> d "price_history": price_history, "agency": row.get('agency'), "available_from": available_from_str, - }, + } + + if row.get('service_charge') is not None: + properties["service_charge"] = row['service_charge'] + if row.get('lease_left') is not None: + properties["lease_left"] = row['lease_left'] + + return { + "type": "Feature", + "properties": properties, "geometry": { "coordinates": [row['longitude'], row['latitude']], "type": "Point", @@ -90,9 +97,7 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A property_info = listing.additional_info.get("property", {}) if listing.additional_info else {} listing_type = "RENT" if isinstance(listing, RentListing) else "BUY" - return { - "type": "Feature", - "properties": { + properties: dict[str, Any] = { "listing_type": listing_type, "city": "London", # change me "country": "United Kingdom", @@ -106,7 +111,17 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A "price_history": [item.to_dict() for item in listing.price_history], "agency": listing.agency, "available_from": property_info.get("letDateAvailable", None), - }, + } + + if isinstance(listing, BuyListing): + if listing.service_charge is not None: + properties["service_charge"] = listing.service_charge + if listing.lease_left is not None: + properties["lease_left"] = listing.lease_left + + return { + "type": "Feature", + "properties": properties, "geometry": { "coordinates": [ listing.longitude,