# from diskcache import Cache import enum from typing import Any import aiohttp class ListingType(enum.StrEnum): BUY = "BUY" RENT = "RENT" class FurnishType(enum.StrEnum): FURNISHED = "furnished" UNFURNISHED = "unfurnished" PART_FURNISHED = "partFurnished" headers = { "Host": "api.rightmove.co.uk", # 'Accept-Encoding': 'gzip, deflate, br', "User-Agent": "okhttp/4.10.0", "Connection": "close", } class PropertyType(enum.StrEnum): BUNGALOW = "bungalow" DETACHED = "detached" FLAT = "flat" LAND = "land" PARK_HOME = "park-home" SEMI_DETACHED = "semi-detached" TERRACED = "terraced" async def detail_query(detail_id: int): params = { "apiApplication": "ANDROID", "appVersion": "3.70.0", } url = f"https://api.rightmove.co.uk/api/property/{detail_id}" async with aiohttp.ClientSession() as session: async with session.get(url, params=params, headers=headers) as response: if response.status != 200: raise Exception( f"""id: {detail_id}. Status Code: {response.status}.""" f"""Failed due to: {await response.text()}""" ) return await response.json() async def listing_query( *, page: int, channel: ListingType, min_bedrooms: int, max_bedrooms: int, radius: float, min_price: int, max_price: int, location_id: str = "STATION^5168", # kings cross station mustNewHome: bool = False, max_days_since_added: int = 30, property_type: list[PropertyType] = [], page_size: int = 25, furnish_types: list[FurnishType] = [], ) -> dict[str, Any]: params: dict[str, str] = { "locationIdentifier": location_id, "channel": str(channel).upper(), "page": str(page), "numberOfPropertiesPerPage": str(page_size), "radius": str(radius), "sortBy": "distance", "includeUnavailableProperties": "false", "minPrice": str(min_price), "maxPrice": str(max_price), "minBedrooms": str(min_bedrooms), "maxBedrooms": str(max_bedrooms), "apiApplication": "ANDROID", "appVersion": "4.28.0", } if channel is ListingType.BUY: params["dontShow"] = "sharedOwnership,retirement" if len(property_type) > 0: params["propertyTypes"] = ",".join(property_type) if max_days_since_added is not None and max_days_since_added not in [ 1, 3, 7, 14, ]: raise Exception("Invalid max days. Can only be", [1, 3, 7, 14]) params["maxDaysSinceAdded"] = str(max_days_since_added) if mustNewHome: params["mustHave"] = "newHome" if channel is ListingType.RENT: if furnish_types: params["furnishTypes"] = ",".join(furnish_types) headers = { "Host": "api.rightmove.co.uk", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "okhttp/4.12.0", "Connection": "keep-alive", } async with aiohttp.ClientSession() as session: async with session.get( "https://api.rightmove.co.uk/api/property-listing", params=params, headers=headers, ) as response: if response.status != 200: raise Exception(f"Failed due to: {await response.text()}") return await response.json()