wrongmove/crawler/ui_exporter.py

144 lines
4.8 KiB
Python

import json
import logging
import pathlib
from typing import Any
from models.listing import QueryParameters, RentListing, BuyListing
from repositories.listing_repository import ListingRepository
logger = logging.getLogger("uvicorn.error")
def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> dict[str, Any]:
"""Convert a projected row dict to GeoJSON Feature format.
This function handles dict rows from stream_listings_optimized(),
which uses column projection and returns dicts instead of model instances.
Args:
row: A dict with keys matching STREAMING_COLUMNS
Returns:
A GeoJSON Feature dict with properties and geometry
"""
# Parse price history from JSON string
price_history = []
if row.get('price_history_json'):
parsed = json.loads(row['price_history_json'])
price_history = [
{
"first_seen": p["first_seen"],
"last_seen": p["last_seen"],
"price": p["price"]
}
for p in parsed
]
sqm = row.get('square_meters')
price = row['price']
# Handle available_from which may be a datetime or None
available_from_val = row.get('available_from')
available_from_str = None
if available_from_val is not None:
if hasattr(available_from_val, 'isoformat'):
available_from_str = available_from_val.isoformat()
else:
available_from_str = str(available_from_val)
# Handle last_seen which should be a datetime
last_seen_val = row['last_seen']
if hasattr(last_seen_val, 'isoformat'):
last_seen_str = last_seen_val.isoformat()
else:
last_seen_str = str(last_seen_val)
return {
"type": "Feature",
"properties": {
"listing_type": listing_type,
"city": "London",
"country": "United Kingdom",
"qm": sqm,
"qmprice": round(price / sqm, 2) if sqm else None,
"rooms": row['number_of_bedrooms'],
"total_price": price,
"url": f"https://www.rightmove.co.uk/properties/{row['id']}",
"photo_thumbnail": row.get('photo_thumbnail'),
"last_seen": last_seen_str,
"price_history": price_history,
"agency": row.get('agency'),
"available_from": available_from_str,
},
"geometry": {
"coordinates": [row['longitude'], row['latitude']],
"type": "Point",
},
}
def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, Any]:
"""Convert a single listing to GeoJSON Feature format.
Args:
listing: A RentListing or BuyListing model instance
Returns:
A GeoJSON Feature dict with properties and geometry
"""
# Safely access nested additional_info
property_info = listing.additional_info.get("property", {}) if listing.additional_info else {}
listing_type = "RENT" if isinstance(listing, RentListing) else "BUY"
return {
"type": "Feature",
"properties": {
"listing_type": listing_type,
"city": "London", # change me
"country": "United Kingdom",
"qm": listing.square_meters,
"qmprice": listing.price_per_square_meter,
"rooms": listing.number_of_bedrooms,
"total_price": listing.price,
"url": listing.url,
"photo_thumbnail": listing.photo_thumbnail,
"last_seen": listing.last_seen.isoformat(),
"price_history": [item.to_dict() for item in listing.price_history],
"agency": listing.agency,
"available_from": property_info.get("letDateAvailable", None),
},
"geometry": {
"coordinates": [
listing.longitude,
listing.latitude,
],
"type": "Point",
},
}
async def export_immoweb(
repository: ListingRepository,
output_file: str | None = None,
query_parameters: QueryParameters | None = None,
limit: int | None = None,
):
listings = await repository.get_listings(
query_parameters=query_parameters,
limit=limit,
)
logger.info(f"Fetched {len(listings)} listings")
# Convert listings to GeoJSON features using the helper function
immoweb_listings = [convert_to_geojson_feature(listing) for listing in listings]
prefix = "var data = "
serialized_data = {"type": "FeatureCollection", "features": immoweb_listings}
result = prefix + json.dumps(serialized_data, indent=4)
if output_file:
output_file_path = pathlib.Path(output_file)
output_file_path.touch(exist_ok=True)
with open(str(output_file_path), "w") as f:
f.write(result)
return serialized_data