import asyncio from dataclasses import dataclass from datetime import datetime import enum from typing import Any import aiohttp from models.listing import FurnishType, ListingType, QueryParameters from rec import districts from tenacity import retry, wait_random headers = { "Host": "api.rightmove.co.uk", # 'Accept-Encoding': 'gzip, deflate, br', "User-Agent": "okhttp/4.10.0", "Connection": "close", } class PropertyType(enum.StrEnum): BUNGALOW = "bungalow" DETACHED = "detached" FLAT = "flat" LAND = "land" PARK_HOME = "park-home" SEMI_DETACHED = "semi-detached" TERRACED = "terraced" @retry(wait=wait_random(min=1, max=2)) async def detail_query(detail_id: int) -> dict[str, Any]: params = { "apiApplication": "ANDROID", "appVersion": "3.70.0", } url = f"https://api.rightmove.co.uk/api/property/{detail_id}" async with aiohttp.ClientSession(trust_env=True) as session: async with session.get(url, params=params, headers=headers) as response: if response.status != 200: raise Exception( f"""id: {detail_id}. Status Code: {response.status}.""" f"""Failed due to: {await response.text()}""" ) return await response.json() async def listing_query( *, page: int, channel: ListingType, min_bedrooms: int, max_bedrooms: int, radius: float, min_price: int, max_price: int, district: str, # = "STATION^5168", # kings cross station mustNewHome: bool = False, max_days_since_added: int = 30, property_type: list[PropertyType] = [], page_size: int = 25, furnish_types: list[FurnishType] = [], ) -> dict[str, Any]: params: dict[str, str] = { "locationIdentifier": districts.get_districts()[district], "channel": str(channel).upper(), "page": str(page), "numberOfPropertiesPerPage": str(page_size), "radius": str(radius), "sortBy": "distance", "includeUnavailableProperties": "false", "minPrice": str(min_price), "maxPrice": str(max_price), "minBedrooms": str(min_bedrooms), "maxBedrooms": str(max_bedrooms), "apiApplication": "ANDROID", "appVersion": "4.28.0", } if channel is ListingType.BUY: params["dontShow"] = "sharedOwnership,retirement" if len(property_type) > 0: params["propertyTypes"] = ",".join(property_type) if max_days_since_added is not None and max_days_since_added not in [ 1, 3, 7, 14, ]: raise Exception( f"Invalid max days - {max_days_since_added} Can only be got", [1, 3, 7, 14], ) params["maxDaysSinceAdded"] = str(max_days_since_added) if mustNewHome: params["mustHave"] = "newHome" if channel is ListingType.RENT: if furnish_types: params["furnishTypes"] = ",".join(furnish_types) headers = { "Host": "api.rightmove.co.uk", "Accept-Encoding": "gzip, deflate, br", "User-Agent": "okhttp/4.12.0", "Connection": "keep-alive", } async with aiohttp.ClientSession(trust_env=True) as session: async with session.get( "https://api.rightmove.co.uk/api/property-listing", params=params, headers=headers, ) as response: if response.status != 200: raise Exception(f"Failed due to: {await response.text()}") return await response.json()