from datetime import datetime, timedelta import logging from typing import Any, Generator from data_access import Listing from models.listing import ( BuyListing, FurnishType, Listing as modelListing, ListingType, QueryParameters, RentListing, ) from sqlalchemy import Engine, func, select as sa_select from sqlmodel import Session, select from tqdm import tqdm logger = logging.getLogger("uvicorn.error") # Columns needed for GeoJSON streaming (excludes routing_info_json, additional_info) STREAMING_COLUMNS = [ 'id', 'price', 'number_of_bedrooms', 'square_meters', 'longitude', 'latitude', 'photo_thumbnail', 'last_seen', 'agency', 'price_history_json', 'available_from', 'service_charge', 'lease_left', ] class ListingRepository: engine: Engine # Monthly rent prices in the UK are always below 20,000 GBP. # Any listing priced at or above this threshold is treated as a purchase (buy) listing. BUY_LISTING_PRICE_THRESHOLD: int = 20_000 def __init__(self, engine: Engine): self.engine = engine async def get_listings( self, query_parameters: QueryParameters | None = None, only_ids: list[int] | None = None, limit: int | None = None, ) -> list[modelListing]: """ Get all listings from the database. """ only_ids = only_ids or [] model = self._get_model_for_query(query_parameters) query = select(model) if only_ids: query = query.where(model.id.in_(only_ids)) # type: ignore query = self._apply_query_filters(query, model, query_parameters) if limit: query = query.limit(limit) with Session(self.engine) as session: rows = list(session.exec(query).all()) logging.debug(f"Found {len(rows)} listings") return rows def stream_listings( self, query_parameters: QueryParameters | None = None, limit: int | None = None, chunk_size: int = 100, ) -> Generator[modelListing, None, None]: """Yield listings one at a time for streaming. Uses yield_per for memory-efficient iteration over large result sets. Args: query_parameters: Filtering parameters limit: Maximum number of listings to yield chunk_size: Number of rows to fetch at a time from the database """ model = self._get_model_for_query(query_parameters) query = select(model) query = self._apply_query_filters(query, model, query_parameters) if limit: query = query.limit(limit) with Session(self.engine) as session: for listing in session.exec(query).yield_per(chunk_size): yield listing def _get_model_for_query( self, query_parameters: QueryParameters | None ) -> type[RentListing] | type[BuyListing]: """Get the appropriate model class based on query parameters.""" if query_parameters and query_parameters.listing_type == ListingType.BUY: return BuyListing return RentListing def count_listings(self, query_parameters: QueryParameters | None = None) -> int: """Fast count for progress estimation.""" model = self._get_model_for_query(query_parameters) query = sa_select(func.count(model.id)) query = self._apply_query_filters(query, model, query_parameters) with Session(self.engine) as session: return session.execute(query).scalar() or 0 def stream_listings_optimized( self, query_parameters: QueryParameters | None = None, limit: int | None = None, page_size: int = 100, ) -> Generator[dict, None, None]: """Stream listings with keyset pagination and column projection. Uses keyset pagination for O(1) performance at any offset, and only fetches columns needed for GeoJSON (excludes large JSON blobs). Args: query_parameters: Filtering parameters limit: Maximum number of listings to yield page_size: Number of rows to fetch per database round-trip """ model = self._get_model_for_query(query_parameters) # Select only needed columns (excludes routing_info_json, additional_info) columns = [ getattr(model, col) for col in STREAMING_COLUMNS if hasattr(model, col) ] last_id: int | None = None total_yielded = 0 while True: if limit and total_yielded >= limit: break query = sa_select(*columns) query = self._apply_query_filters( query, model, query_parameters ) # Keyset pagination: WHERE id > last_id (O(1) performance) if last_id is not None: query = query.where(model.id > last_id) batch_limit = page_size if limit: batch_limit = min(page_size, limit - total_yielded) query = query.order_by(model.id).limit(batch_limit) with Session(self.engine) as session: results = session.execute(query).fetchall() if not results: break for row in results: yield row._asdict() last_id = row.id total_yielded += 1 if len(results) < page_size: break def _apply_query_filters( self, query: Any, model: type[RentListing] | type[BuyListing], query_parameters: QueryParameters | None = None, ) -> Any: """Apply WHERE clauses from query parameters to a query. Works with both SQLModel select() and raw SQLAlchemy sa_select() queries, since both support the .where() interface. Args: query: A SQLModel or SQLAlchemy select query model: The listing model class (RentListing or BuyListing) query_parameters: Optional filtering parameters Returns: The query with WHERE clauses applied """ if query_parameters is None: return query query = query.where( model.number_of_bedrooms.between( query_parameters.min_bedrooms, query_parameters.max_bedrooms ), model.price.between(query_parameters.min_price, query_parameters.max_price), ) if query_parameters.min_sqm is not None: query = query.where(model.square_meters >= query_parameters.min_sqm) if query_parameters.max_sqm is not None: query = query.where(model.square_meters <= query_parameters.max_sqm) if query_parameters.min_price_per_sqm is not None: query = query.where( model.square_meters.is_not(None), model.square_meters > 0, (model.price / model.square_meters) >= query_parameters.min_price_per_sqm, ) if query_parameters.max_price_per_sqm is not None: query = query.where( model.square_meters.is_not(None), model.square_meters > 0, (model.price / model.square_meters) <= query_parameters.max_price_per_sqm, ) if query_parameters.furnish_types and model == RentListing: query = query.where(model.furnish_type.in_(query_parameters.furnish_types)) if ( model == RentListing and query_parameters.let_date_available_from is not None ): query = query.where( model.available_from >= query_parameters.let_date_available_from ) if query_parameters.last_seen_days is not None: last_seen_threshold = datetime.now() - timedelta( days=query_parameters.last_seen_days ) query = query.where(model.last_seen >= last_seen_threshold) return query async def upsert_listings( self, listings: list[modelListing], ) -> list[modelListing]: """ Upsert listings into the database. """ models = [] with Session(self.engine) as session: for listing in listings: session.merge(listing) models.append(listing) session.commit() return models async def upsert_listings_legacy( self, listings: list[Listing], ) -> list[modelListing]: """Upsert legacy Listing objects into the database. .. deprecated:: This method converts legacy data_access.Listing objects to SQLModel entities. Use upsert_listings() with RentListing/BuyListing directly. Legacy Listing objects from filesystem-based data may contain malformed or incomplete data, so conversion errors are logged and skipped rather than aborting the entire batch. """ models = [] failed_to_upsert = [] with Session(self.engine) as session: for listing in tqdm(listings, desc="Upserting listings"): # Convert legacy Listing to the appropriate SQLModel entity try: model_listing = await self._get_concrete_listing(listing) except Exception as e: # Legacy Listing -> model conversion may fail for malformed data # (e.g. missing required fields, invalid types). Log and skip. logger.error(f"Error converting listing {listing.identifier}: {e}") failed_to_upsert.append(listing) continue session.merge(model_listing) models.append(model_listing) session.commit() if failed_to_upsert: logger.warning(f"Failed to upsert {len(failed_to_upsert)} listings.") return models @staticmethod def _parse_furnish_type(listing: Listing) -> FurnishType: """Extract and normalize the furnish type from a legacy Listing's detail object. Handles missing/null detailobject, missing property key, missing or null letFurnishType value, and normalizes "landlord" variants to ASK_LANDLORD. Args: listing: A legacy data_access.Listing object Returns: The parsed FurnishType enum value """ if ( listing.detailobject is None or listing.detailobject.get("property") is None or listing.detailobject["property"].get("letFurnishType") is None ): return FurnishType.UNKNOWN furnish_type_str = listing.detailobject["property"]["letFurnishType"] if furnish_type_str is None: return FurnishType.UNKNOWN elif "landlord" in furnish_type_str.lower(): furnish_type_str = "ask landlord" else: furnish_type_str = furnish_type_str.lower() return FurnishType(furnish_type_str) async def _get_concrete_listing( self, listing: Listing, ) -> modelListing: now = datetime.now() furnish_type = self._parse_furnish_type(listing) if listing.price < self.BUY_LISTING_PRICE_THRESHOLD: model_listing = RentListing( id=listing.identifier, price=listing.price, number_of_bedrooms=listing.bedrooms, square_meters=await listing.sqm_ocr(), agency=listing.agency, council_tax_band=listing.councilTaxBand, longitude=listing.longitude, latitude=listing.latitude, price_history_json=modelListing.serialize_price_history( listing.priceHistory ), listing_site=listing.listing_site, last_seen=now, photo_thumbnail=listing.photoThumbnail, furnish_type=furnish_type, available_from=listing.letDateAvailable, additional_info=listing.detailobject, ) else: model_listing = BuyListing( id=listing.identifier, price=listing.price, number_of_bedrooms=listing.bedrooms, square_meters=await listing.sqm_ocr(), agency=listing.agency, council_tax_band=listing.councilTaxBand, longitude=listing.longitude, latitude=listing.latitude, price_history_json=modelListing.serialize_price_history( listing.priceHistory ), listing_site=listing.listing_site, last_seen=now, photo_thumbnail=listing.photoThumbnail, service_charge=listing.serviceCharge, additional_info=listing.detailobject, ) return model_listing def get_listing_ids( self, listing_type: ListingType = ListingType.RENT, ) -> set[int]: """Get all listing IDs from the database (ID-only projection). Much faster than get_listings() when only IDs are needed for filtering against API results. """ model = RentListing if listing_type == ListingType.RENT else BuyListing with Session(self.engine) as session: result = session.execute(sa_select(model.id)) return {row[0] for row in result.fetchall()} async def mark_seen(self, listing_id: int, listing_type: ListingType = ListingType.RENT) -> None: query_params = QueryParameters(listing_type=listing_type) listings = await self.get_listings(only_ids=[listing_id], query_parameters=query_params) if len(listings) == 0: return listing = listings[0] now = datetime.now() listing.last_seen = now await self.upsert_listings([listing])