Backend: include first 5 photo URLs from additional_info in GeoJSON streaming response, with fallback to photo_thumbnail. Frontend: replace single thumbnail with swipeable embla-carousel on compact cards. Remove window.open on card tap so clicking opens the detail bottom sheet instead of navigating to Rightmove.
180 lines
6.2 KiB
Python
180 lines
6.2 KiB
Python
import json
|
|
import logging
|
|
import pathlib
|
|
from typing import Any
|
|
|
|
from models.listing import QueryParameters, RentListing, BuyListing
|
|
from repositories.listing_repository import ListingRepository
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
|
|
def convert_row_to_geojson(row: dict[str, Any], listing_type: str = "RENT") -> dict[str, Any]:
|
|
"""Convert a projected row dict to GeoJSON Feature format.
|
|
|
|
This function handles dict rows from stream_listings_optimized(),
|
|
which uses column projection and returns dicts instead of model instances.
|
|
|
|
Args:
|
|
row: A dict with keys matching STREAMING_COLUMNS
|
|
|
|
Returns:
|
|
A GeoJSON Feature dict with properties and geometry
|
|
"""
|
|
# Parse price history from JSON string
|
|
price_history = []
|
|
if row.get('price_history_json'):
|
|
parsed = json.loads(row['price_history_json'])
|
|
price_history = [
|
|
{
|
|
"first_seen": p["first_seen"],
|
|
"last_seen": p["last_seen"],
|
|
"price": p["price"]
|
|
}
|
|
for p in parsed
|
|
]
|
|
|
|
sqm = row.get('square_meters')
|
|
price = row['price']
|
|
|
|
# Handle available_from which may be a datetime or None
|
|
available_from_val = row.get('available_from')
|
|
available_from_str = None
|
|
if available_from_val is not None:
|
|
if hasattr(available_from_val, 'isoformat'):
|
|
available_from_str = available_from_val.isoformat()
|
|
else:
|
|
available_from_str = str(available_from_val)
|
|
|
|
# Handle last_seen which should be a datetime
|
|
last_seen_val = row['last_seen']
|
|
if hasattr(last_seen_val, 'isoformat'):
|
|
last_seen_str = last_seen_val.isoformat()
|
|
else:
|
|
last_seen_str = str(last_seen_val)
|
|
|
|
# Extract first 5 photo URLs from additional_info
|
|
photos: list[str] = []
|
|
additional_info = row.get('additional_info')
|
|
if additional_info:
|
|
if isinstance(additional_info, str):
|
|
additional_info = json.loads(additional_info)
|
|
images = additional_info.get('property', {}).get('images', [])
|
|
photos = [img['url'] for img in images[:5] if isinstance(img, dict) and 'url' in img]
|
|
if not photos and row.get('photo_thumbnail'):
|
|
photos = [row['photo_thumbnail']]
|
|
|
|
properties: dict[str, Any] = {
|
|
"id": row['id'],
|
|
"listing_type": listing_type,
|
|
"city": "London",
|
|
"country": "United Kingdom",
|
|
"qm": sqm,
|
|
"qmprice": round(price / sqm, 2) if sqm else None,
|
|
"rooms": row['number_of_bedrooms'],
|
|
"total_price": price,
|
|
"url": f"https://www.rightmove.co.uk/properties/{row['id']}",
|
|
"photo_thumbnail": row.get('photo_thumbnail'),
|
|
"photos": photos,
|
|
"last_seen": last_seen_str,
|
|
"price_history": price_history,
|
|
"agency": row.get('agency'),
|
|
"available_from": available_from_str,
|
|
}
|
|
|
|
if row.get('service_charge') is not None:
|
|
properties["service_charge"] = row['service_charge']
|
|
if row.get('lease_left') is not None:
|
|
properties["lease_left"] = row['lease_left']
|
|
|
|
return {
|
|
"type": "Feature",
|
|
"properties": properties,
|
|
"geometry": {
|
|
"coordinates": [row['longitude'], row['latitude']],
|
|
"type": "Point",
|
|
},
|
|
}
|
|
|
|
|
|
def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, Any]:
|
|
"""Convert a single listing to GeoJSON Feature format.
|
|
|
|
Args:
|
|
listing: A RentListing or BuyListing model instance
|
|
|
|
Returns:
|
|
A GeoJSON Feature dict with properties and geometry
|
|
"""
|
|
# Safely access nested additional_info
|
|
property_info = listing.additional_info.get("property", {}) if listing.additional_info else {}
|
|
listing_type = "RENT" if isinstance(listing, RentListing) else "BUY"
|
|
|
|
# Extract first 5 photo URLs
|
|
images = property_info.get('images', [])
|
|
photos = [img['url'] for img in images[:5] if isinstance(img, dict) and 'url' in img]
|
|
if not photos and listing.photo_thumbnail:
|
|
photos = [listing.photo_thumbnail]
|
|
|
|
properties: dict[str, Any] = {
|
|
"id": listing.id,
|
|
"listing_type": listing_type,
|
|
"city": "London",
|
|
"country": "United Kingdom",
|
|
"qm": listing.square_meters,
|
|
"qmprice": listing.price_per_square_meter,
|
|
"rooms": listing.number_of_bedrooms,
|
|
"total_price": listing.price,
|
|
"url": listing.url,
|
|
"photo_thumbnail": listing.photo_thumbnail,
|
|
"photos": photos,
|
|
"last_seen": listing.last_seen.isoformat(),
|
|
"price_history": [item.to_dict() for item in listing.price_history],
|
|
"agency": listing.agency,
|
|
"available_from": property_info.get("letDateAvailable", None),
|
|
}
|
|
|
|
if isinstance(listing, BuyListing):
|
|
if listing.service_charge is not None:
|
|
properties["service_charge"] = listing.service_charge
|
|
if listing.lease_left is not None:
|
|
properties["lease_left"] = listing.lease_left
|
|
|
|
return {
|
|
"type": "Feature",
|
|
"properties": properties,
|
|
"geometry": {
|
|
"coordinates": [
|
|
listing.longitude,
|
|
listing.latitude,
|
|
],
|
|
"type": "Point",
|
|
},
|
|
}
|
|
|
|
|
|
async def export_immoweb(
|
|
repository: ListingRepository,
|
|
output_file: str | None = None,
|
|
query_parameters: QueryParameters | None = None,
|
|
limit: int | None = None,
|
|
):
|
|
listings = await repository.get_listings(
|
|
query_parameters=query_parameters,
|
|
limit=limit,
|
|
)
|
|
logger.info(f"Fetched {len(listings)} listings")
|
|
|
|
# Convert listings to GeoJSON features using the helper function
|
|
immoweb_listings = [convert_to_geojson_feature(listing) for listing in listings]
|
|
|
|
prefix = "var data = "
|
|
serialized_data = {"type": "FeatureCollection", "features": immoweb_listings}
|
|
result = prefix + json.dumps(serialized_data, indent=4)
|
|
|
|
if output_file:
|
|
output_file_path = pathlib.Path(output_file)
|
|
output_file_path.touch(exist_ok=True)
|
|
with open(str(output_file_path), "w") as f:
|
|
f.write(result)
|
|
return serialized_data
|