wrongmove/crawler/repositories/listing_repository.py
2025-06-14 13:39:56 +00:00

185 lines
6.6 KiB
Python

from datetime import datetime, timedelta
from data_access import Listing
from models.listing import (
BuyListing,
FurnishType,
Listing as modelListing,
QueryParameters,
RentListing,
)
from sqlalchemy import Engine
from sqlmodel import Sequence, Session, and_, col, select
from sqlmodel.sql.expression import SelectOfScalar
from tqdm import tqdm
class ListingRepository:
engine: Engine
# anything more than 10k is considered buy type
buy_listing_price_threshold: int = 20_000
def __init__(self, engine: Engine):
self.engine = engine
async def get_listings(
self,
query_parameters: QueryParameters | None = None,
only_ids: list[int] | None = None,
limit: int | None = None,
) -> list[modelListing]:
"""
Get all listings from the database.
"""
only_ids = only_ids or []
query = select(
RentListing
) # TODO: one nice day I will think of a way to query both rent and buy
if only_ids:
query = query.where(RentListing.id.in_(only_ids)) # type: ignore
query = self._add_where_from_query_parameters(query, query_parameters)
if limit:
query = query.limit(limit)
with Session(self.engine) as session:
# query = select(modelListing)
return list(session.exec(query).all())
def _add_where_from_query_parameters(
self,
query: SelectOfScalar[RentListing],
query_parameters: QueryParameters | None = None,
) -> SelectOfScalar[RentListing]:
if query_parameters is None:
return query
query = query.where(
RentListing.number_of_bedrooms.between(
query_parameters.min_bedrooms, query_parameters.max_bedrooms
),
RentListing.price.between(
query_parameters.min_price, query_parameters.max_price
),
)
if query_parameters.min_sqm is not None:
query = query.where(RentListing.square_meters >= query_parameters.min_sqm)
if query_parameters.furnish_types:
query = query.where(
RentListing.furnish_type.in_(query_parameters.furnish_types)
)
if query_parameters.let_date_available_from is not None:
query = query.where(
RentListing.available_from >= query_parameters.let_date_available_from
)
if query_parameters.last_seen_days is not None:
last_seen_threshold = datetime.now() - timedelta(
days=query_parameters.last_seen_days
)
query = query.where(RentListing.last_seen >= last_seen_threshold)
return query
async def upsert_listings(
self,
listings: list[modelListing],
) -> list[modelListing]:
"""
Upsert listings into the database.
"""
models = []
with Session(self.engine) as session:
for listing in listings:
session.merge(listing)
models.append(listing)
session.commit()
return models
async def upsert_listings_legacy(
self,
listings: list[Listing],
) -> list[modelListing]:
"""
Upsert listings into the database.
"""
models = []
failed_to_upsert = []
with Session(self.engine) as session:
for listing in tqdm(listings, desc="Upserting listings"):
# Convert Listing to modelListing
try:
model_listing = await self._get_concrete_listing(listing)
except Exception as e: # WHY SO MANY ERORRS??
import ipdb
ipdb.set_trace()
# If for whatever reason we cannot add listing, ignore and retry
print(f"Error converting listing {listing.identifier}: {e}")
failed_to_upsert.append(listing)
continue
session.merge(model_listing)
models.append(model_listing)
session.commit()
print(f"Failed to upsert {len(failed_to_upsert)} listings.")
return models
async def _get_concrete_listing(
self,
listing: Listing,
) -> modelListing:
now = datetime.now()
if (
listing.detailobject is None
or listing.detailobject.get("property") is None
or listing.detailobject["property"].get("letFurnishType") is None
):
furnish_type_str = "unknown"
else:
furnish_type_str = listing.detailobject["property"]["letFurnishType"]
if furnish_type_str is None:
furnish_type_str = "unknown"
elif "landlord" in furnish_type_str.lower():
furnish_type_str = "ask landlord"
else:
furnish_type_str = furnish_type_str.lower()
furnish_type = FurnishType(furnish_type_str)
if listing.price < self.buy_listing_price_threshold:
model_listing = RentListing(
id=listing.identifier,
price=listing.price,
number_of_bedrooms=listing.bedrooms,
square_meters=await listing.sqm_ocr(),
agency=listing.agency,
council_tax_band=listing.councilTaxBand,
longtitude=listing.longtitude,
latitude=listing.latitude,
price_history_json=modelListing.serialize_price_history(
listing.priceHistory
),
listing_site=listing.listing_site,
last_seen=now,
photo_thumbnail=listing.photoThumbnail,
furnish_type=furnish_type,
available_from=listing.letDateAvailable,
additional_info=listing.detailobject,
)
else:
model_listing = BuyListing(
id=listing.identifier,
price=listing.price,
number_of_bedrooms=listing.bedrooms,
square_meters=await listing.sqm_ocr(),
agency=listing.agency,
council_tax_band=listing.councilTaxBand,
longtitude=listing.longtitude,
latitude=listing.latitude,
price_history_json=modelListing.serialize_price_history(
listing.priceHistory
),
listing_site=listing.listing_site,
last_seen=now,
photo_thumbnail=listing.photoThumbnail,
service_charge=listing.serviceCharge,
additional_info=listing.detailobject,
)
return model_listing