Backend: include first 5 photo URLs from additional_info in GeoJSON streaming response, with fallback to photo_thumbnail. Frontend: replace single thumbnail with swipeable embla-carousel on compact cards. Remove window.open on card tap so clicking opens the detail bottom sheet instead of navigating to Rightmove.
400 lines
15 KiB
Python
400 lines
15 KiB
Python
from datetime import datetime, timedelta
|
|
import logging
|
|
from typing import Any, Generator
|
|
from data_access import Listing
|
|
from models.listing import (
|
|
BuyListing,
|
|
FurnishType,
|
|
Listing as modelListing,
|
|
ListingType,
|
|
QueryParameters,
|
|
RentListing,
|
|
)
|
|
from sqlalchemy import Engine, func, select as sa_select
|
|
from sqlmodel import Session, select
|
|
from tqdm import tqdm
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
# Columns needed for GeoJSON streaming (excludes routing_info_json)
|
|
STREAMING_COLUMNS = [
|
|
'id', 'price', 'number_of_bedrooms', 'square_meters',
|
|
'longitude', 'latitude', 'photo_thumbnail', 'last_seen',
|
|
'agency', 'price_history_json', 'available_from',
|
|
'service_charge', 'lease_left', 'additional_info',
|
|
]
|
|
|
|
|
|
class ListingRepository:
|
|
engine: Engine
|
|
|
|
# Monthly rent prices in the UK are always below 20,000 GBP.
|
|
# Any listing priced at or above this threshold is treated as a purchase (buy) listing.
|
|
BUY_LISTING_PRICE_THRESHOLD: int = 20_000
|
|
|
|
def __init__(self, engine: Engine):
|
|
self.engine = engine
|
|
|
|
async def get_listings(
|
|
self,
|
|
query_parameters: QueryParameters | None = None,
|
|
only_ids: list[int] | None = None,
|
|
limit: int | None = None,
|
|
listing_type: ListingType | None = None,
|
|
) -> list[modelListing]:
|
|
"""
|
|
Get all listings from the database.
|
|
"""
|
|
only_ids = only_ids or []
|
|
|
|
model = self._get_model_for_query(query_parameters, listing_type=listing_type)
|
|
|
|
query = select(model)
|
|
if only_ids:
|
|
query = query.where(model.id.in_(only_ids)) # type: ignore
|
|
query = self._apply_query_filters(query, model, query_parameters)
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
with Session(self.engine) as session:
|
|
rows = list(session.exec(query).all())
|
|
logging.debug(f"Found {len(rows)} listings")
|
|
return rows
|
|
|
|
def stream_listings(
|
|
self,
|
|
query_parameters: QueryParameters | None = None,
|
|
limit: int | None = None,
|
|
chunk_size: int = 100,
|
|
) -> Generator[modelListing, None, None]:
|
|
"""Yield listings one at a time for streaming.
|
|
|
|
Uses yield_per for memory-efficient iteration over large result sets.
|
|
|
|
Args:
|
|
query_parameters: Filtering parameters
|
|
limit: Maximum number of listings to yield
|
|
chunk_size: Number of rows to fetch at a time from the database
|
|
"""
|
|
model = self._get_model_for_query(query_parameters)
|
|
|
|
query = select(model)
|
|
query = self._apply_query_filters(query, model, query_parameters)
|
|
if limit:
|
|
query = query.limit(limit)
|
|
|
|
with Session(self.engine) as session:
|
|
for listing in session.exec(query).yield_per(chunk_size):
|
|
yield listing
|
|
|
|
def _get_model_for_query(
|
|
self,
|
|
query_parameters: QueryParameters | None,
|
|
listing_type: ListingType | None = None,
|
|
) -> type[RentListing] | type[BuyListing]:
|
|
"""Get the appropriate model class based on query parameters or explicit listing type.
|
|
|
|
The explicit listing_type parameter takes precedence over query_parameters.
|
|
"""
|
|
if listing_type == ListingType.BUY:
|
|
return BuyListing
|
|
if listing_type is not None:
|
|
return RentListing
|
|
if query_parameters and query_parameters.listing_type == ListingType.BUY:
|
|
return BuyListing
|
|
return RentListing
|
|
|
|
def count_listings(self, query_parameters: QueryParameters | None = None) -> int:
|
|
"""Fast count for progress estimation."""
|
|
model = self._get_model_for_query(query_parameters)
|
|
|
|
query = sa_select(func.count(model.id))
|
|
query = self._apply_query_filters(query, model, query_parameters)
|
|
|
|
with Session(self.engine) as session:
|
|
return session.execute(query).scalar() or 0
|
|
|
|
def stream_listings_optimized(
|
|
self,
|
|
query_parameters: QueryParameters | None = None,
|
|
limit: int | None = None,
|
|
page_size: int = 100,
|
|
) -> Generator[dict, None, None]:
|
|
"""Stream listings with keyset pagination and column projection.
|
|
|
|
Uses keyset pagination for O(1) performance at any offset, and only
|
|
fetches columns needed for GeoJSON (excludes large JSON blobs).
|
|
|
|
Args:
|
|
query_parameters: Filtering parameters
|
|
limit: Maximum number of listings to yield
|
|
page_size: Number of rows to fetch per database round-trip
|
|
"""
|
|
model = self._get_model_for_query(query_parameters)
|
|
|
|
# Select only needed columns (excludes routing_info_json, additional_info)
|
|
columns = [
|
|
getattr(model, col) for col in STREAMING_COLUMNS if hasattr(model, col)
|
|
]
|
|
|
|
last_id: int | None = None
|
|
total_yielded = 0
|
|
|
|
while True:
|
|
if limit and total_yielded >= limit:
|
|
break
|
|
|
|
query = sa_select(*columns)
|
|
query = self._apply_query_filters(
|
|
query, model, query_parameters
|
|
)
|
|
|
|
# Keyset pagination: WHERE id > last_id (O(1) performance)
|
|
if last_id is not None:
|
|
query = query.where(model.id > last_id)
|
|
|
|
batch_limit = page_size
|
|
if limit:
|
|
batch_limit = min(page_size, limit - total_yielded)
|
|
query = query.order_by(model.id).limit(batch_limit)
|
|
|
|
with Session(self.engine) as session:
|
|
results = session.execute(query).fetchall()
|
|
|
|
if not results:
|
|
break
|
|
|
|
for row in results:
|
|
yield row._asdict()
|
|
last_id = row.id
|
|
total_yielded += 1
|
|
|
|
if len(results) < page_size:
|
|
break
|
|
|
|
def _apply_query_filters(
|
|
self,
|
|
query: Any,
|
|
model: type[RentListing] | type[BuyListing],
|
|
query_parameters: QueryParameters | None = None,
|
|
) -> Any:
|
|
"""Apply WHERE clauses from query parameters to a query.
|
|
|
|
Works with both SQLModel select() and raw SQLAlchemy sa_select() queries,
|
|
since both support the .where() interface.
|
|
|
|
Args:
|
|
query: A SQLModel or SQLAlchemy select query
|
|
model: The listing model class (RentListing or BuyListing)
|
|
query_parameters: Optional filtering parameters
|
|
|
|
Returns:
|
|
The query with WHERE clauses applied
|
|
"""
|
|
if query_parameters is None:
|
|
return query
|
|
query = query.where(
|
|
model.number_of_bedrooms.between(
|
|
query_parameters.min_bedrooms, query_parameters.max_bedrooms
|
|
),
|
|
model.price.between(query_parameters.min_price, query_parameters.max_price),
|
|
)
|
|
if query_parameters.min_sqm is not None:
|
|
query = query.where(model.square_meters >= query_parameters.min_sqm)
|
|
if query_parameters.max_sqm is not None:
|
|
query = query.where(model.square_meters <= query_parameters.max_sqm)
|
|
if query_parameters.min_price_per_sqm is not None:
|
|
query = query.where(
|
|
model.square_meters.is_not(None),
|
|
model.square_meters > 0,
|
|
(model.price / model.square_meters) >= query_parameters.min_price_per_sqm,
|
|
)
|
|
if query_parameters.max_price_per_sqm is not None:
|
|
query = query.where(
|
|
model.square_meters.is_not(None),
|
|
model.square_meters > 0,
|
|
(model.price / model.square_meters) <= query_parameters.max_price_per_sqm,
|
|
)
|
|
if query_parameters.furnish_types and model == RentListing:
|
|
query = query.where(model.furnish_type.in_(query_parameters.furnish_types))
|
|
if (
|
|
model == RentListing
|
|
and query_parameters.let_date_available_from is not None
|
|
):
|
|
query = query.where(
|
|
model.available_from >= query_parameters.let_date_available_from
|
|
)
|
|
if query_parameters.last_seen_days is not None:
|
|
last_seen_threshold = datetime.now() - timedelta(
|
|
days=query_parameters.last_seen_days
|
|
)
|
|
query = query.where(model.last_seen >= last_seen_threshold)
|
|
return query
|
|
|
|
async def upsert_listings(
|
|
self,
|
|
listings: list[modelListing],
|
|
) -> list[modelListing]:
|
|
"""
|
|
Upsert listings into the database.
|
|
"""
|
|
models = []
|
|
with Session(self.engine) as session:
|
|
for listing in listings:
|
|
session.merge(listing)
|
|
models.append(listing)
|
|
session.commit()
|
|
return models
|
|
|
|
async def upsert_listings_legacy(
|
|
self,
|
|
listings: list[Listing],
|
|
) -> list[modelListing]:
|
|
"""Upsert legacy Listing objects into the database.
|
|
|
|
.. deprecated::
|
|
This method converts legacy data_access.Listing objects to SQLModel
|
|
entities. Use upsert_listings() with RentListing/BuyListing directly.
|
|
|
|
Legacy Listing objects from filesystem-based data may contain malformed
|
|
or incomplete data, so conversion errors are logged and skipped rather
|
|
than aborting the entire batch.
|
|
"""
|
|
models = []
|
|
failed_to_upsert = []
|
|
with Session(self.engine) as session:
|
|
for listing in tqdm(listings, desc="Upserting listings"):
|
|
# Convert legacy Listing to the appropriate SQLModel entity
|
|
try:
|
|
model_listing = await self._get_concrete_listing(listing)
|
|
except Exception as e:
|
|
# Legacy Listing -> model conversion may fail for malformed data
|
|
# (e.g. missing required fields, invalid types). Log and skip.
|
|
logger.error(f"Error converting listing {listing.identifier}: {e}")
|
|
failed_to_upsert.append(listing)
|
|
continue
|
|
session.merge(model_listing)
|
|
models.append(model_listing)
|
|
session.commit()
|
|
if failed_to_upsert:
|
|
logger.warning(f"Failed to upsert {len(failed_to_upsert)} listings.")
|
|
return models
|
|
|
|
@staticmethod
|
|
def _parse_furnish_type(listing: Listing) -> FurnishType:
|
|
"""Extract and normalize the furnish type from a legacy Listing's detail object.
|
|
|
|
Handles missing/null detailobject, missing property key, missing or null
|
|
letFurnishType value, and normalizes "landlord" variants to ASK_LANDLORD.
|
|
|
|
Args:
|
|
listing: A legacy data_access.Listing object
|
|
|
|
Returns:
|
|
The parsed FurnishType enum value
|
|
"""
|
|
if (
|
|
listing.detailobject is None
|
|
or listing.detailobject.get("property") is None
|
|
or listing.detailobject["property"].get("letFurnishType") is None
|
|
):
|
|
return FurnishType.UNKNOWN
|
|
|
|
furnish_type_str = listing.detailobject["property"]["letFurnishType"]
|
|
if furnish_type_str is None:
|
|
return FurnishType.UNKNOWN
|
|
elif "landlord" in furnish_type_str.lower():
|
|
furnish_type_str = "ask landlord"
|
|
else:
|
|
furnish_type_str = furnish_type_str.lower()
|
|
|
|
return FurnishType(furnish_type_str)
|
|
|
|
async def _get_concrete_listing(
|
|
self,
|
|
listing: Listing,
|
|
) -> modelListing:
|
|
now = datetime.now()
|
|
furnish_type = self._parse_furnish_type(listing)
|
|
|
|
if listing.price < self.BUY_LISTING_PRICE_THRESHOLD:
|
|
model_listing = RentListing(
|
|
id=listing.identifier,
|
|
price=listing.price,
|
|
number_of_bedrooms=listing.bedrooms,
|
|
square_meters=await listing.sqm_ocr(),
|
|
agency=listing.agency,
|
|
council_tax_band=listing.councilTaxBand,
|
|
longitude=listing.longitude,
|
|
latitude=listing.latitude,
|
|
price_history_json=modelListing.serialize_price_history(
|
|
listing.priceHistory
|
|
),
|
|
listing_site=listing.listing_site,
|
|
last_seen=now,
|
|
photo_thumbnail=listing.photoThumbnail,
|
|
furnish_type=furnish_type,
|
|
available_from=listing.letDateAvailable,
|
|
additional_info=listing.detailobject,
|
|
)
|
|
else:
|
|
model_listing = BuyListing(
|
|
id=listing.identifier,
|
|
price=listing.price,
|
|
number_of_bedrooms=listing.bedrooms,
|
|
square_meters=await listing.sqm_ocr(),
|
|
agency=listing.agency,
|
|
council_tax_band=listing.councilTaxBand,
|
|
longitude=listing.longitude,
|
|
latitude=listing.latitude,
|
|
price_history_json=modelListing.serialize_price_history(
|
|
listing.priceHistory
|
|
),
|
|
listing_site=listing.listing_site,
|
|
last_seen=now,
|
|
photo_thumbnail=listing.photoThumbnail,
|
|
service_charge=listing.serviceCharge,
|
|
additional_info=listing.detailobject,
|
|
)
|
|
|
|
return model_listing
|
|
|
|
def get_last_updated(self) -> datetime | None:
|
|
"""Get the most recent last_seen timestamp across all listings.
|
|
|
|
Checks both RentListing and BuyListing tables and returns the latest.
|
|
"""
|
|
with Session(self.engine) as session:
|
|
rent_max = session.execute(
|
|
sa_select(func.max(RentListing.last_seen))
|
|
).scalar()
|
|
buy_max = session.execute(
|
|
sa_select(func.max(BuyListing.last_seen))
|
|
).scalar()
|
|
|
|
candidates = [t for t in (rent_max, buy_max) if t is not None]
|
|
return max(candidates) if candidates else None
|
|
|
|
def get_listing_ids(
|
|
self,
|
|
listing_type: ListingType = ListingType.RENT,
|
|
) -> set[int]:
|
|
"""Get all listing IDs from the database (ID-only projection).
|
|
|
|
Much faster than get_listings() when only IDs are needed for
|
|
filtering against API results.
|
|
"""
|
|
model = RentListing if listing_type == ListingType.RENT else BuyListing
|
|
with Session(self.engine) as session:
|
|
result = session.execute(sa_select(model.id))
|
|
return {row[0] for row in result.fetchall()}
|
|
|
|
async def mark_seen(self, listing_id: int, listing_type: ListingType = ListingType.RENT) -> None:
|
|
query_params = QueryParameters(listing_type=listing_type)
|
|
listings = await self.get_listings(only_ids=[listing_id], query_parameters=query_params)
|
|
if len(listings) == 0:
|
|
return
|
|
listing = listings[0]
|
|
now = datetime.now()
|
|
listing.last_seen = now
|
|
await self.upsert_listings([listing])
|