refactor dump listings to start using model instead of the data_access object

This commit is contained in:
Viktor Barzin 2025-06-07 12:46:53 +00:00
parent 842f7cefbe
commit 4f5a934fa9
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
5 changed files with 52 additions and 38 deletions

View file

@ -4,17 +4,18 @@ import pathlib
from typing import Any from typing import Any
from rec.query import detail_query, listing_query, QueryParameters from rec.query import detail_query, listing_query, QueryParameters
from rec.districts import get_districts from rec.districts import get_districts
import repositories from repositories import ListingRepository
from sqlalchemy import Engine from sqlalchemy import Engine
from tqdm.asyncio import tqdm from tqdm.asyncio import tqdm
from data_access import Listing from data_access import Listing
from models import Listing as modelListing
async def dump_listings( async def dump_listings(
parameters: QueryParameters, parameters: QueryParameters,
db_engine: Engine, repository: ListingRepository,
data_dir: pathlib.Path = pathlib.Path("data/rs/"), data_dir: pathlib.Path = pathlib.Path("data/rs/"),
) -> list[Listing]: ) -> list[modelListing]:
if parameters.district_names: if parameters.district_names:
districts = { districts = {
district: locid district: locid
@ -28,7 +29,7 @@ async def dump_listings(
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
json_responses = await tqdm.gather( json_responses = await tqdm.gather(
*[ *[
_dump_listings_with_semaphore(semaphore, i, parameters, locid) _fetch_listings_with_semaphore(semaphore, i, parameters, locid)
for locid in districts.values() for locid in districts.values()
for i in [1, 2] for i in [1, 2]
], ],
@ -41,29 +42,36 @@ async def dump_listings(
for property in response_json["properties"]: for property in response_json["properties"]:
identifier = property["identifier"] identifier = property["identifier"]
listing = Listing(identifier, data_dir=data_dir) listing = Listing(identifier, data_dir=data_dir, _listing_object=property)
listing.dump_listing(property)
listings.append(listing) listings.append(listing)
# if listing is already in db, do not fetch details again # if listing is already in db, do not fetch details again
repository = repositories.ListingRepository(db_engine)
all_listings = await repository.get_listings( all_listings = await repository.get_listings(
only_ids=[listing.identifier for listing in listings] only_ids=[listing.identifier for listing in listings]
) )
all_listing_ids = {listing.id for listing in all_listings} all_listing_ids = {listing.id for listing in all_listings}
await tqdm.gather( listings_without_details = [
listing for listing in listings if not listing.path_detail_json().exists()
]
listing_details = await tqdm.gather(
*[ *[
_dump_detail_with_semaphore(semaphore, listing) _fetch_detail_with_semaphore(semaphore, listing.identifier)
for listing in listings for listing in listings_without_details
# if listing.identifier not in all_listing_ids # One day we will rely solely on the model data # if listing.identifier not in all_listing_ids # One day we will rely solely on the model data
], ],
desc="Fetching details", desc="Fetching details",
) )
return listings for listing, detail in zip(listings_without_details, listing_details):
listing._details_object = detail
model_listings = await repository.upsert_listings(listings) # upsert in db
await dump_listings_to_fs(listings)
return model_listings
async def _dump_listings_with_semaphore( async def _fetch_listings_with_semaphore(
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
page_id: int, page_id: int,
parameters: QueryParameters, parameters: QueryParameters,
@ -87,12 +95,18 @@ async def _dump_listings_with_semaphore(
return listing_query_result return listing_query_result
async def _dump_detail_with_semaphore(semaphore: asyncio.Semaphore, listing: Listing): async def _fetch_detail_with_semaphore(
if listing.path_detail_json().exists(): semaphore: asyncio.Semaphore, listing_id: int
return ) -> dict[str, Any]:
# for listing in tqdm(filtered_listings):
async with semaphore: async with semaphore:
d = await detail_query(listing.identifier) d = await detail_query(listing_id)
with open(listing.path_detail_json(), "w") as f: return d
json.dump(d, f)
async def dump_listings_to_fs(listings: list[Listing]) -> None:
for listing in listings:
if not listing.path_listing_json().exists():
listing.dump_listing()
if not listing.path_detail_json().exists():
with open(listing.path_detail_json(), "w") as f:
json.dump(listing._details_object, f, indent=4)

View file

@ -13,7 +13,8 @@ import datetime
@dataclass() @dataclass()
class Listing: class Listing:
identifier: int identifier: int
_cached: Dict | None = None _details_object: dict[str, Any] | None = None
_listing_object: dict[str, Any] | None = None
data_dir: pathlib.Path = pathlib.Path("data/rs/") data_dir: pathlib.Path = pathlib.Path("data/rs/")
ALL_COLUMNS = [ ALL_COLUMNS = [
"identifier", "identifier",
@ -98,15 +99,19 @@ class Listing:
def path_price_history(self) -> pathlib.Path: def path_price_history(self) -> pathlib.Path:
return self.path_listing() / "price_history.json" return self.path_listing() / "price_history.json"
def dump_listing(self, d: dict): def dump_listing(self) -> None:
if self._listing_object is None:
raise ValueError("No listing data provided to dump.")
with open(self.path_listing_json(), "w") as f: with open(self.path_listing_json(), "w") as f:
json.dump(d, f) json.dump(self._listing_object, f)
with open(self.path_last_seen_listing(), "w") as f: with open(self.path_last_seen_listing(), "w") as f:
dt = datetime.datetime.now().isoformat() dt = datetime.datetime.now().isoformat()
json.dump(dt, f) json.dump(dt, f)
# some places list pw in price and others pcm # some places list pw in price and others pcm
price = max(d["price"], d.get("monthlyRent", 0)) price = max(
self._listing_object["price"], self._listing_object.get("monthlyRent", 0)
)
self.append_price_history(price) self.append_price_history(price)
def append_price_history(self, price: float) -> None: def append_price_history(self, price: float) -> None:
@ -249,19 +254,18 @@ class Listing:
@property @property
def listingobject(self): def listingobject(self):
if self._cached is None: with open(self.path_listing_json()) as f:
with open(self.path_listing_json()) as f: return json.load(f)
return json.load(f)
@property @property
def detailobject(self) -> dict[str, Any]: def detailobject(self) -> dict[str, Any]:
if self._cached is None: if self._details_object is None:
if self.path_detail_json().exists(): if self.path_detail_json().exists():
with open(self.path_detail_json()) as f: with open(self.path_detail_json()) as f:
self._cached = json.load(f) self._details_object = json.load(f)
else: else:
return {} return {}
return self._cached # type: ignore return self._details_object # type: ignore
@property @property
def price(self) -> float: def price(self) -> float:

View file

@ -155,11 +155,10 @@ def dump_listings(
f"{query_parameters}" f"{query_parameters}"
) )
data_dir_path = pathlib.Path(data_dir) data_dir_path = pathlib.Path(data_dir)
listings = asyncio.run(
dump_listings_module.dump_listings(query_parameters, engine, data_dir_path)
)
repository = ListingRepository(engine=engine) repository = ListingRepository(engine=engine)
asyncio.run(repository.upsert_listings(listings)) asyncio.run(
dump_listings_module.dump_listings(query_parameters, repository, data_dir_path)
)
@cli.command() @cli.command()

View file

@ -50,9 +50,6 @@ class RentListing(Listing, table=True):
class BuyListing(Listing, table=True): class BuyListing(Listing, table=True):
service_charge: float | None = Field(default=None, nullable=True) service_charge: float | None = Field(default=None, nullable=True)
council_tax_band: str | None = Field(
default=None, nullable=True
) # e.g., A, B, C, D, E, F, G
lease_left: int | None = Field( lease_left: int | None = Field(
default=None, nullable=True default=None, nullable=True
) # in years, e.g., 90, 80, etc. ) # in years, e.g., 90, 80, etc.

View file

@ -115,7 +115,7 @@ async def listing_query(
radius: float, radius: float,
min_price: int, min_price: int,
max_price: int, max_price: int,
location_id: str = "STATION^5168", # kings cross station location_id: str, # = "STATION^5168", # kings cross station
mustNewHome: bool = False, mustNewHome: bool = False,
max_days_since_added: int = 30, max_days_since_added: int = 30,
property_type: list[PropertyType] = [], property_type: list[PropertyType] = [],