import enum from typing import Any import aiohttp from models.listing import FurnishType, ListingType from rec import districts from tenacity import retry, stop_after_attempt, wait_random headers = { "Host": "api.rightmove.co.uk", # 'Accept-Encoding': 'gzip, deflate, br', "User-Agent": "okhttp/4.10.0", "Connection": "close", } class PropertyType(enum.StrEnum): BUNGALOW = "bungalow" DETACHED = "detached" FLAT = "flat" LAND = "land" PARK_HOME = "park-home" SEMI_DETACHED = "semi-detached" TERRACED = "terraced" @retry(wait=wait_random(min=1, max=2), stop=stop_after_attempt(3)) async def detail_query(detail_id: int) -> dict[str, Any]: params = { "apiApplication": "ANDROID", "appVersion": "3.70.0", } url = f"https://api.rightmove.co.uk/api/property/{detail_id}" async with aiohttp.ClientSession(trust_env=True) as session: async with session.get(url, params=params, headers=headers) as response: if response.status != 200: raise Exception( f"""id: {detail_id}. Status Code: {response.status}.""" f"""Failed due to: {await response.text()}""" ) return await response.json() @retry(wait=wait_random(min=1, max=60), stop=stop_after_attempt(3)) async def listing_query( *, page: int, channel: ListingType, min_bedrooms: int, max_bedrooms: int, radius: float, min_price: int, max_price: int, district: str, # = "STATION^5168", # kings cross station mustNewHome: bool = False, max_days_since_added: int = 30, property_type: list[PropertyType] = [], page_size: int = 25, furnish_types: list[FurnishType] = [], ) -> dict[str, Any]: params: dict[str, str] = { "locationIdentifier": districts.get_districts()[district], "channel": str(channel).upper(), "page": str(page), "numberOfPropertiesPerPage": str(page_size), "radius": str(radius), "sortBy": "distance", "includeUnavailableProperties": "false", "minPrice": str(min_price), "maxPrice": str(max_price), "minBedrooms": str(min_bedrooms), "maxBedrooms": str(max_bedrooms), "apiApplication": "ANDROID", "appVersion": "4.28.0", } if channel is ListingType.BUY: params["dontShow"] = "sharedOwnership,retirement" if len(property_type) > 0: params["propertyTypes"] = ",".join(property_type) if max_days_since_added is not None and max_days_since_added not in [ 1, 3, 7, 14, ]: raise Exception( f"Invalid max days - {max_days_since_added} Can only be got", [1, 3, 7, 14], ) params["maxDaysSinceAdded"] = str(max_days_since_added) if mustNewHome: params["mustHave"] = "newHome" if channel is ListingType.RENT: if furnish_types: params["furnishTypes"] = ",".join(furnish_types) headers = { "Host": "api.rightmove.co.uk", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "okhttp/4.12.0", "Connection": "keep-alive", } async with aiohttp.ClientSession(trust_env=True) as session: async with session.get( "https://api.rightmove.co.uk/api/property-listing", params=params, headers=headers, ) as response: if response.status != 200: raise Exception(f"Failed due to: {await response.text()}") return await response.json()