refactor the semaphore when dumping listings

This commit is contained in:
Viktor Barzin 2025-06-06 20:08:38 +00:00
parent b7a2ea75aa
commit 29213f3d26
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
2 changed files with 34 additions and 29 deletions

View file

@ -1,6 +1,6 @@
import asyncio
from dataclasses import dataclass
import pathlib
from typing import Any
from rec.query import listing_query, QueryParameters
from rec.districts import get_districts
from data_access import Listing
@ -24,20 +24,7 @@ async def dump_listings(
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
json_responses = await asyncio.gather(
*[
listing_query(
page=i,
channel=parameters.listing_type,
min_bedrooms=parameters.min_bedrooms,
max_bedrooms=parameters.max_bedrooms,
radius=parameters.radius,
min_price=parameters.min_price,
max_price=parameters.max_price,
location_id=locid,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
semaphore=semaphore,
)
_dump_listings_with_semaphore(semaphore, i, parameters, locid)
for locid in districts.values()
for i in [1, 2]
]
@ -57,3 +44,26 @@ async def dump_listings(
listings.append(listing)
return listings
async def _dump_listings_with_semaphore(
semaphore: asyncio.Semaphore,
page_id: int,
parameters: QueryParameters,
location_id: str,
) -> dict[str, Any]:
async with semaphore:
listing = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=parameters.min_bedrooms,
max_bedrooms=parameters.max_bedrooms,
radius=parameters.radius,
min_price=parameters.min_price,
max_price=parameters.max_price,
location_id=location_id,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
return listing

View file

@ -121,7 +121,6 @@ async def listing_query(
property_type: list[PropertyType] = [],
page_size: int = 25,
furnish_types: list[FurnishType] = [],
semaphore: asyncio.Semaphore | None = None,
) -> dict[str, Any]:
params: dict[str, str] = {
"locationIdentifier": location_id,
@ -164,16 +163,12 @@ async def listing_query(
"Connection": "keep-alive",
}
if semaphore is None:
semaphore = asyncio.Semaphore(1)
async with semaphore:
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
"https://api.rightmove.co.uk/api/property-listing",
params=params,
headers=headers,
) as response:
if response.status != 200:
raise Exception(f"Failed due to: {await response.text()}")
return await response.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
"https://api.rightmove.co.uk/api/property-listing",
params=params,
headers=headers,
) as response:
if response.status != 200:
raise Exception(f"Failed due to: {await response.text()}")
return await response.json()