wrongmove/crawler/rec/query.py

114 lines
3 KiB
Python

# from diskcache import Cache
import enum
from typing import List
import requests
# from rec.db import RightmoveListing
import urllib3
urllib3.disable_warnings()
# cache = Cache(r"_cache")
headers = {
"Host": "api.rightmove.co.uk",
# 'Accept-Encoding': 'gzip, deflate, br',
"User-Agent": "okhttp/4.10.0",
"Connection": "close",
}
class PropertyType(enum.StrEnum):
BUNGALOW = "bungalow"
DETACHED = "detached"
FLAT = "flat"
LAND = "land"
PARK_HOME = "park-home"
SEMI_DETACHED = "semi-detached"
TERRACED = "terraced"
def detail_query(detail_id: int):
params = {
"apiApplication": "ANDROID",
"appVersion": "3.70.0",
}
url = f"https://api.rightmove.co.uk/api/property/{detail_id}"
response = requests.get(url, params=params, headers=headers, verify=False)
if response.status_code != 200:
raise Exception(f"id: {detail_id}. Status Code: {response.status_code}. Failed due to: {response.text}")
return response.json()
# @cache.memoize()
def listing_query(
page: int,
min_bedrooms: int,
max_bedrooms: int,
radius: float,
min_price: int,
max_price: int,
location_id: str = "STATION^5168", # kings cross station
mustNewHome: bool = False,
max_days_since_added: int = None,
property_type: List["PropertyType"] = [],
page_size=25,
) -> dict:
params = {
"locationIdentifier": location_id,
"channel": "BUY",
"page": str(page),
"numberOfPropertiesPerPage": str(page_size),
"radius": str(radius),
"sortBy": "distance",
"includeUnavailableProperties": "false",
"dontShow": "sharedOwnership,retirement",
"minPrice": str(min_price),
"maxPrice": str(max_price),
"minBedrooms": str(min_bedrooms),
"maxBedrooms": str(max_bedrooms),
"apiApplication": "ANDROID",
"appVersion": "3.70.0",
}
if len(property_type) > 0:
params["propertyTypes"] = ",".join(property_type)
if max_days_since_added:
if max_days_since_added not in [1, 3, 7, 14]:
raise Exception("Invalid max days. Can only be", [1, 3, 7, 14])
params["maxDaysSinceAdded"] = max_days_since_added
if mustNewHome:
params["mustHave"] = "newHome"
response = requests.get(
"https://api.rightmove.co.uk/api/property-listing",
params=params,
headers=headers,
verify=False,
)
if response.status_code != 200:
raise Exception("Failed due to: ", response.text)
return response.json()
if __name__ == "__main__":
response = listing_query(
page=1,
min_bedrooms=2,
max_bedrooms=2,
radius=5.0,
min_price=150000,
max_price=700000,
)
resp = response
for d in resp["properties"]:
rl = RightmoveListing(
id=d["identifier"],
listing_json=d,
price=d["price"],
updated_timestamp=d["updateDate"],
lat=d["latitude"],
lon=d["longitude"],
)
rl.save()