# from diskcache import Cache import enum from typing import List import requests # from rec.db import RightmoveListing import urllib3 urllib3.disable_warnings() # cache = Cache(r"_cache") headers = { "Host": "api.rightmove.co.uk", # 'Accept-Encoding': 'gzip, deflate, br', "User-Agent": "okhttp/4.10.0", "Connection": "close", } class PropertyType(enum.StrEnum): BUNGALOW = "bungalow" DETACHED = "detached" FLAT = "flat" LAND = "land" PARK_HOME = "park-home" SEMI_DETACHED = "semi-detached" TERRACED = "terraced" def detail_query(detail_id: int): params = { "apiApplication": "ANDROID", "appVersion": "3.70.0", } url = f"https://api.rightmove.co.uk/api/property/{detail_id}" response = requests.get(url, params=params, headers=headers, verify=False) if response.status_code != 200: raise Exception(f"id: {detail_id}. Status Code: {response.status_code}. Failed due to: {response.text}") return response.json() # @cache.memoize() def listing_query( page: int, min_bedrooms: int, max_bedrooms: int, radius: float, min_price: int, max_price: int, location_id: str = "STATION^5168", # kings cross station mustNewHome: bool = False, max_days_since_added: int = None, property_type: List["PropertyType"] = [], page_size=25, ) -> dict: params = { "locationIdentifier": location_id, "channel": "BUY", "page": str(page), "numberOfPropertiesPerPage": str(page_size), "radius": str(radius), "sortBy": "distance", "includeUnavailableProperties": "false", "dontShow": "sharedOwnership,retirement", "minPrice": str(min_price), "maxPrice": str(max_price), "minBedrooms": str(min_bedrooms), "maxBedrooms": str(max_bedrooms), "apiApplication": "ANDROID", "appVersion": "3.70.0", } if len(property_type) > 0: params["propertyTypes"] = ",".join(property_type) if max_days_since_added: if max_days_since_added not in [1, 3, 7, 14]: raise Exception("Invalid max days. Can only be", [1, 3, 7, 14]) params["maxDaysSinceAdded"] = max_days_since_added if mustNewHome: params["mustHave"] = "newHome" response = requests.get( "https://api.rightmove.co.uk/api/property-listing", params=params, headers=headers, verify=False, ) if response.status_code != 200: raise Exception("Failed due to: ", response.text) return response.json() if __name__ == "__main__": response = listing_query( page=1, min_bedrooms=2, max_bedrooms=2, radius=5.0, min_price=150000, max_price=700000, ) resp = response for d in resp["properties"]: rl = RightmoveListing( id=d["identifier"], listing_json=d, price=d["price"], updated_timestamp=d["updateDate"], lat=d["latitude"], lon=d["longitude"], ) rl.save()