import json import logging import pathlib from typing import Any from models.listing import QueryParameters, RentListing, BuyListing from repositories.listing_repository import ListingRepository logger = logging.getLogger("uvicorn.error") def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> dict[str, Any]: """Convert a projected row dict to GeoJSON Feature format. This function handles dict rows from stream_listings_optimized(), which uses column projection and returns dicts instead of model instances. Args: row: A dict with keys matching STREAMING_COLUMNS Returns: A GeoJSON Feature dict with properties and geometry """ # Parse price history from JSON string price_history = [] if row.get('price_history_json'): parsed = json.loads(row['price_history_json']) price_history = [ { "first_seen": p["first_seen"], "last_seen": p["last_seen"], "price": p["price"] } for p in parsed ] sqm = row.get('square_meters') price = row['price'] # Handle available_from which may be a datetime or None available_from_val = row.get('available_from') available_from_str = None if available_from_val is not None: if hasattr(available_from_val, 'isoformat'): available_from_str = available_from_val.isoformat() else: available_from_str = str(available_from_val) # Handle last_seen which should be a datetime last_seen_val = row['last_seen'] if hasattr(last_seen_val, 'isoformat'): last_seen_str = last_seen_val.isoformat() else: last_seen_str = str(last_seen_val) return { "type": "Feature", "properties": { "listing_type": listing_type, "city": "London", "country": "United Kingdom", "qm": sqm, "qmprice": round(price / sqm, 2) if sqm else None, "rooms": row['number_of_bedrooms'], "total_price": price, "url": f"https://www.rightmove.co.uk/properties/{row['id']}", "photo_thumbnail": row.get('photo_thumbnail'), "last_seen": last_seen_str, "price_history": price_history, "agency": row.get('agency'), "available_from": available_from_str, }, "geometry": { "coordinates": [row['longitude'], row['latitude']], "type": "Point", }, } def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, Any]: """Convert a single listing to GeoJSON Feature format. Args: listing: A RentListing or BuyListing model instance Returns: A GeoJSON Feature dict with properties and geometry """ # Safely access nested additional_info property_info = listing.additional_info.get("property", {}) if listing.additional_info else {} listing_type = "RENT" if isinstance(listing, RentListing) else "BUY" return { "type": "Feature", "properties": { "listing_type": listing_type, "city": "London", # change me "country": "United Kingdom", "qm": listing.square_meters, "qmprice": listing.price_per_square_meter, "rooms": listing.number_of_bedrooms, "total_price": listing.price, "url": listing.url, "photo_thumbnail": listing.photo_thumbnail, "last_seen": listing.last_seen.isoformat(), "price_history": [item.to_dict() for item in listing.price_history], "agency": listing.agency, "available_from": property_info.get("letDateAvailable", None), }, "geometry": { "coordinates": [ listing.longitude, listing.latitude, ], "type": "Point", }, } async def export_immoweb( repository: ListingRepository, output_file: str | None = None, query_parameters: QueryParameters | None = None, limit: int | None = None, ): listings = await repository.get_listings( query_parameters=query_parameters, limit=limit, ) logger.info(f"Fetched {len(listings)} listings") # Convert listings to GeoJSON features using the helper function immoweb_listings = [convert_to_geojson_feature(listing) for listing in listings] prefix = "var data = " serialized_data = {"type": "FeatureCollection", "features": immoweb_listings} result = prefix + json.dumps(serialized_data, indent=4) if output_file: output_file_path = pathlib.Path(output_file) output_file_path.touch(exist_ok=True) with open(str(output_file_path), "w") as f: f.write(result) return serialized_data