Compare commits

...

10 commits

13 changed files with 844 additions and 33 deletions

View file

@ -27,7 +27,7 @@ async def dump_images(
@retry(wait=wait_random(min=1, max=2), stop=stop_after_attempt(3))
async def dump_images_for_listing(listing: Listing, base_path: Path) -> Listing | None:
all_floorplans = listing.additional_info["property"]["floorplans"]
all_floorplans = listing.additional_info.get("property", {}).get("floorplans", [])
for floorplan in all_floorplans:
url = floorplan["url"]
picname = url.split("/")[-1]

View file

@ -15,6 +15,7 @@ from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, Query
from api.auth import User
from models.listing import QueryParameters
from notifications import send_notification
from rec import districts
from redis_repository import RedisRepository
from repositories.listing_repository import ListingRepository
@ -57,6 +58,11 @@ app.add_middleware(
)
@app.get("/api/status")
async def get_status():
return {"status": "OK"}
@app.get("/api/listing")
async def get_listing(user: Annotated[User, Depends(get_current_user)]):
repository = ListingRepository(engine)
@ -82,6 +88,11 @@ async def refresh_listings(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
) -> dict[str, str]:
await send_notification(
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
)
# await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync
# return {}
# TODO: rate limit
expiry_time = datetime.now() + timedelta(minutes=10)
task = listing_tasks.dump_listings_task.apply_async(

17
crawler/api/metrics.py Normal file
View file

@ -0,0 +1,17 @@
# metrics.py
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from prometheus_client import make_asgi_app
# Set up Prometheus reader and meter provider
reader = PrometheusMetricReader()
provider = MeterProvider(
metric_readers=[reader],
resource=Resource.create({SERVICE_NAME: "fastapi-metrics-app"}),
)
set_meter_provider(provider)
# Expose the Prometheus metrics endpoint
metrics_app = make_asgi_app() # Exposes /metrics

View file

@ -110,7 +110,7 @@ class Listing:
# some places list pw in price and others pcm
price = max(
self._listing_object["price"],
self._listing_object["price"] or 0,
self._listing_object.get("monthlyRent", 0) or 0,
)
self.append_price_history(price)

View file

@ -256,7 +256,9 @@ export function Map(
const lastSeenDays = Math.round((new Date() - new Date(lastSeenStr)) / (1000 * 60 * 60 * 24));
return <div key={property.properties.url} style={{ display: 'flex', flexWrap: 'wrap' }}>
<div className="propertyListingPopupItem" style={{ width: '100%' }}>
<a href={property.properties.url} target="_blank">
<img src={property.properties.photo_thumbnail} />
</a>
</div>
<div className="propertyListingPopupItem">
<strong>Available from:</strong>

View file

@ -0,0 +1,201 @@
from __future__ import annotations
from abc import abstractmethod
import asyncio
from datetime import datetime
import logging
import multiprocessing
from pathlib import Path
import aiohttp
from models.listing import FurnishType, Listing, ListingSite, RentListing
from rec import floorplan
from rec.query import detail_query
from repositories.listing_repository import ListingRepository
logger = logging.getLogger("uvicorn.error")
class ListingProcessor:
semaphore: asyncio.Semaphore
process_steps: list[Step]
def __init__(self, listing_repository: ListingRepository):
self.semaphore = asyncio.Semaphore(20)
# Register new processing steps here
# Order is important
self.process_steps = [
FetchListingDetailsStep(listing_repository),
FetchImagesStep(listing_repository),
DetectFloorplanStep(listing_repository),
]
async def process_listing(self, listing_id: int) -> Listing | None:
listing = None
for step in self.process_steps:
if await step.needs_processing(listing_id):
async with self.semaphore:
listing = await step.process(listing_id)
return listing
async def listing_exists(self, listing_id: int) -> bool: ...
class Step:
listing_repository: ListingRepository
def __init__(self, listing_repository: ListingRepository):
self.listing_repository = listing_repository
@abstractmethod
async def process(self, listing_id: int) -> Listing: ...
@abstractmethod
async def needs_processing(self, listing_id: int) -> bool:
return True
class FetchListingDetailsStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if (existing_listings) == 0:
return True
return False
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching details for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
now = datetime.now()
if len(existing_listings) > 0:
# listing exists, do not refresh
return existing_listings[0]
listing_details = await detail_query(listing_id)
furnish_type_str = listing_details["property"].get("letFurnishType", "unknown")
if furnish_type_str is None:
furnish_type_str = "unknown"
elif "landlord" in furnish_type_str.lower():
furnish_type_str = "ask landlord"
else:
furnish_type_str = furnish_type_str.lower()
furnish_type = FurnishType(furnish_type_str)
available_from: datetime | None = None
available_from_str: str | None = listing_details["property"]["letDateAvailable"]
if available_from_str is None:
available_from = None
elif available_from_str.lower() == "now":
available_from = datetime.now()
else:
try:
available_from = datetime.strptime(available_from_str, "%d/%m/%Y")
except ValueError:
# If the date format is not as expected, return None
available_from = None
photos = listing_details["property"]["photos"]
# listing = Listing(
listing = RentListing( # TODO: should pick based on price?
id=listing_id,
price=listing_details["property"]["price"],
number_of_bedrooms=listing_details["property"]["bedrooms"],
square_meters=None, # populated later
agency=listing_details["property"]["branch"]["brandName"],
council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][
0
]["value"],
longtitude=listing_details["property"]["longitude"],
latitude=listing_details["property"]["latitude"],
price_history_json="{}", # TODO: should upsert from existing
listing_site=ListingSite.RIGHTMOVE,
last_seen=now,
photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None,
furnish_type=furnish_type,
available_from=available_from,
additional_info=listing_details,
)
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching details for {listing_id=}")
# TODO: dump to filesystem
return listing
class FetchImagesStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
return False # if listing doesn't exist, we can't process it
listing = existing_listings[0]
return len(listing.floorplan_image_paths) == 0
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching images for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
raise ValueError(f"Listing {listing_id} not found")
listing = existing_listings[0]
base_path = Path("data/rs/")
all_floorplans = listing.additional_info.get("property", {}).get(
"floorplans", []
)
for floorplan in all_floorplans:
url = floorplan["url"]
picname = url.split("/")[-1]
floorplan_path = Path(base_path, str(listing.id), "floorplans", picname)
if floorplan_path.exists():
continue
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 404:
return listing
if response.status != 200:
raise Exception(f"Error for {url}: {response.status}")
floorplan_path.parent.mkdir(parents=True, exist_ok=True)
with open(floorplan_path, "wb") as f:
f.write(await response.read())
listing.floorplan_image_paths.append(str(floorplan_path))
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching images for {listing_id=}")
return listing
class DetectFloorplanStep(Step):
ocr_semaphore: asyncio.Semaphore
def __init__(self, listing_repository: ListingRepository):
super().__init__(listing_repository)
self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4)
async def needs_processing(self, listing_id: int) -> bool:
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
return False
return listings[0].square_meters is None
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Running floorplan detection for {listing_id=}")
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
raise ValueError(f"Listing {listing_id} does not exist")
listing = listings[0]
sqms = []
for floorplan_path in listing.floorplan_image_paths:
async with self.ocr_semaphore:
estimated_sqm, _ = await asyncio.to_thread(
floorplan.calculate_ocr, floorplan_path
)
if estimated_sqm is not None:
sqms.append(estimated_sqm)
max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0
# if max_sqm is not None:
listing.square_meters = max_sqm
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed running floorplan detection for {listing_id=}")
return listing

View file

@ -6,6 +6,7 @@ import pathlib
import click
import importlib
import listing_processor
from models.listing import FurnishType, ListingType, QueryParameters
from rec.districts import get_districts
from data_access import Listing

30
crawler/notifications.py Normal file
View file

@ -0,0 +1,30 @@
from abc import abstractmethod
from enum import StrEnum
import apprise
from functools import lru_cache
import os
class Surface:
@abstractmethod
def connection_string(self) -> str | None: ...
class Slack(Surface):
def connection_string(self) -> str | None:
return os.environ.get("SLACK_WEBHOOK_URL")
@lru_cache(maxsize=None)
def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise:
surfaces = surfaces or list[Surface]([Slack()])
obj = apprise.Apprise()
for surface in surfaces:
if conn := surface.connection_string():
obj.add(conn)
return obj
async def send_notification(body: str, title: str = "") -> bool:
notifier = get_notifier()
return await notifier.async_notify(body=body, title=title)

384
crawler/poetry.lock generated
View file

@ -217,6 +217,26 @@ files = [
{file = "appnope-0.1.4.tar.gz", hash = "sha256:1de3860566df9caf38f01f86f65e0e13e379af54f9e4bee1e66b48f2efffd1ee"},
]
[[package]]
name = "apprise"
version = "1.9.3"
description = "Push Notifications that work with just about every platform!"
optional = false
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "apprise-1.9.3-py3-none-any.whl", hash = "sha256:e9b5abb73244c21a30ee493860f8d4ae80665d225b1b436179d48db4f6fc5b9e"},
{file = "apprise-1.9.3.tar.gz", hash = "sha256:f583667ea35b8899cd46318c6cb26f0faf6a4605b119174c2523a012590c65a6"},
]
[package.dependencies]
certifi = "*"
click = ">=5.0"
markdown = "*"
PyYAML = "*"
requests = "*"
requests-oauthlib = "*"
[[package]]
name = "argon2-cffi"
version = "25.1.0"
@ -290,6 +310,21 @@ types-python-dateutil = ">=2.8.10"
doc = ["doc8", "sphinx (>=7.0.0)", "sphinx-autobuild", "sphinx-autodoc-typehints", "sphinx_rtd_theme (>=1.3.0)"]
test = ["dateparser (==1.*)", "pre-commit", "pytest", "pytest-cov", "pytest-mock", "pytz (==2021.1)", "simplejson (==3.*)"]
[[package]]
name = "asgiref"
version = "3.9.1"
description = "ASGI specs, helper code, and adapters"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "asgiref-3.9.1-py3-none-any.whl", hash = "sha256:f3bba7092a48005b5f5bacd747d36ee4a5a61f4a269a6df590b43144355ebd2c"},
{file = "asgiref-3.9.1.tar.gz", hash = "sha256:a5ab6582236218e5ef1648f242fd9f10626cfd4de8dc377db215d5d5098e3142"},
]
[package.extras]
tests = ["mypy (>=1.14.0)", "pytest", "pytest-asyncio"]
[[package]]
name = "asttokens"
version = "3.0.0"
@ -1679,6 +1714,30 @@ files = [
[package.extras]
all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"]
[[package]]
name = "importlib-metadata"
version = "8.7.0"
description = "Read metadata from Python packages"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd"},
{file = "importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000"},
]
[package.dependencies]
zipp = ">=3.20"
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=2.2)"]
perf = ["ipython"]
test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"]
type = ["pytest-mypy"]
[[package]]
name = "ipdb"
version = "0.13.13"
@ -2263,6 +2322,22 @@ babel = ["Babel"]
lingua = ["lingua"]
testing = ["pytest"]
[[package]]
name = "markdown"
version = "3.8.2"
description = "Python implementation of John Gruber's Markdown."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "markdown-3.8.2-py3-none-any.whl", hash = "sha256:5c83764dbd4e00bdd94d85a19b8d55ccca20fe35b2e678a1422b380324dd5f24"},
{file = "markdown-3.8.2.tar.gz", hash = "sha256:247b9a70dd12e27f67431ce62523e675b866d254f900c4fe75ce3dda62237c45"},
]
[package.extras]
docs = ["mdx_gh_links (>=0.2)", "mkdocs (>=1.6)", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-nature (>=0.6)", "mkdocs-section-index", "mkdocstrings[python]"]
testing = ["coverage", "pyyaml"]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
@ -2752,6 +2827,23 @@ files = [
{file = "numpy-1.26.4.tar.gz", hash = "sha256:2a02aba9ed12e4ac4eb3ea9421c420301a0c6460d9830d74a9df87efa4912010"},
]
[[package]]
name = "oauthlib"
version = "3.3.1"
description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1"},
{file = "oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9"},
]
[package.extras]
rsa = ["cryptography (>=3.0.0)"]
signals = ["blinker (>=1.4.0)"]
signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"]
[[package]]
name = "opencv-python"
version = "4.11.0.86"
@ -2775,6 +2867,168 @@ numpy = [
{version = ">=1.23.5", markers = "python_version == \"3.11\""},
]
[[package]]
name = "opentelemetry-api"
version = "1.36.0"
description = "OpenTelemetry Python API"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_api-1.36.0-py3-none-any.whl", hash = "sha256:02f20bcacf666e1333b6b1f04e647dc1d5111f86b8e510238fcc56d7762cda8c"},
{file = "opentelemetry_api-1.36.0.tar.gz", hash = "sha256:9a72572b9c416d004d492cbc6e61962c0501eaf945ece9b5a0f56597d8348aa0"},
]
[package.dependencies]
importlib-metadata = ">=6.0,<8.8.0"
typing-extensions = ">=4.5.0"
[[package]]
name = "opentelemetry-exporter-prometheus"
version = "0.57b0"
description = "Prometheus Metric Exporter for OpenTelemetry"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_exporter_prometheus-0.57b0-py3-none-any.whl", hash = "sha256:c5b893d1cdd593fb022af2c7de3258c2d5a4d04402ae80d9fa35675fed77f05c"},
{file = "opentelemetry_exporter_prometheus-0.57b0.tar.gz", hash = "sha256:9eb15bdc189235cf03c3f93abf56f8ff0ab57a493a189263bd7fe77a4249e689"},
]
[package.dependencies]
opentelemetry-api = ">=1.12,<2.0"
opentelemetry-sdk = ">=1.36.0,<1.37.0"
prometheus-client = ">=0.5.0,<1.0.0"
[[package]]
name = "opentelemetry-instrumentation"
version = "0.57b0"
description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_instrumentation-0.57b0-py3-none-any.whl", hash = "sha256:9109280f44882e07cec2850db28210b90600ae9110b42824d196de357cbddf7e"},
{file = "opentelemetry_instrumentation-0.57b0.tar.gz", hash = "sha256:f2a30135ba77cdea2b0e1df272f4163c154e978f57214795d72f40befd4fcf05"},
]
[package.dependencies]
opentelemetry-api = ">=1.4,<2.0"
opentelemetry-semantic-conventions = "0.57b0"
packaging = ">=18.0"
wrapt = ">=1.0.0,<2.0.0"
[[package]]
name = "opentelemetry-instrumentation-asgi"
version = "0.57b0"
description = "ASGI instrumentation for OpenTelemetry"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_instrumentation_asgi-0.57b0-py3-none-any.whl", hash = "sha256:47debbde6af066a7e8e911f7193730d5e40d62effc1ac2e1119908347790a3ea"},
{file = "opentelemetry_instrumentation_asgi-0.57b0.tar.gz", hash = "sha256:a6f880b5d1838f65688fc992c65fbb1d3571f319d370990c32e759d3160e510b"},
]
[package.dependencies]
asgiref = ">=3.0,<4.0"
opentelemetry-api = ">=1.12,<2.0"
opentelemetry-instrumentation = "0.57b0"
opentelemetry-semantic-conventions = "0.57b0"
opentelemetry-util-http = "0.57b0"
[package.extras]
instruments = ["asgiref (>=3.0,<4.0)"]
[[package]]
name = "opentelemetry-instrumentation-fastapi"
version = "0.57b0"
description = "OpenTelemetry FastAPI Instrumentation"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_instrumentation_fastapi-0.57b0-py3-none-any.whl", hash = "sha256:61e6402749ffe0bfec582e58155e0d81dd38723cd9bc4562bca1acca80334006"},
{file = "opentelemetry_instrumentation_fastapi-0.57b0.tar.gz", hash = "sha256:73ac22f3c472a8f9cb21d1fbe5a4bf2797690c295fff4a1c040e9b1b1688a105"},
]
[package.dependencies]
opentelemetry-api = ">=1.12,<2.0"
opentelemetry-instrumentation = "0.57b0"
opentelemetry-instrumentation-asgi = "0.57b0"
opentelemetry-semantic-conventions = "0.57b0"
opentelemetry-util-http = "0.57b0"
[package.extras]
instruments = ["fastapi (>=0.92,<1.0)"]
[[package]]
name = "opentelemetry-instrumentation-sqlalchemy"
version = "0.57b0"
description = "OpenTelemetry SQLAlchemy instrumentation"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_instrumentation_sqlalchemy-0.57b0-py3-none-any.whl", hash = "sha256:8a1a815331cb04fc95aa7c50e9c681cdccfb12e1fa4522f079fe4b24753ae106"},
{file = "opentelemetry_instrumentation_sqlalchemy-0.57b0.tar.gz", hash = "sha256:95667326b7cc22bb4bc9941f98ca22dd177679f9a4d277646cc21074c0d732ff"},
]
[package.dependencies]
opentelemetry-api = ">=1.12,<2.0"
opentelemetry-instrumentation = "0.57b0"
opentelemetry-semantic-conventions = "0.57b0"
packaging = ">=21.0"
wrapt = ">=1.11.2"
[package.extras]
instruments = ["sqlalchemy (>=1.0.0,<2.1.0)"]
[[package]]
name = "opentelemetry-sdk"
version = "1.36.0"
description = "OpenTelemetry Python SDK"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_sdk-1.36.0-py3-none-any.whl", hash = "sha256:19fe048b42e98c5c1ffe85b569b7073576ad4ce0bcb6e9b4c6a39e890a6c45fb"},
{file = "opentelemetry_sdk-1.36.0.tar.gz", hash = "sha256:19c8c81599f51b71670661ff7495c905d8fdf6976e41622d5245b791b06fa581"},
]
[package.dependencies]
opentelemetry-api = "1.36.0"
opentelemetry-semantic-conventions = "0.57b0"
typing-extensions = ">=4.5.0"
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.57b0"
description = "OpenTelemetry Semantic Conventions"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_semantic_conventions-0.57b0-py3-none-any.whl", hash = "sha256:757f7e76293294f124c827e514c2a3144f191ef175b069ce8d1211e1e38e9e78"},
{file = "opentelemetry_semantic_conventions-0.57b0.tar.gz", hash = "sha256:609a4a79c7891b4620d64c7aac6898f872d790d75f22019913a660756f27ff32"},
]
[package.dependencies]
opentelemetry-api = "1.36.0"
typing-extensions = ">=4.5.0"
[[package]]
name = "opentelemetry-util-http"
version = "0.57b0"
description = "Web util for OpenTelemetry"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "opentelemetry_util_http-0.57b0-py3-none-any.whl", hash = "sha256:e54c0df5543951e471c3d694f85474977cd5765a3b7654398c83bab3d2ffb8e9"},
{file = "opentelemetry_util_http-0.57b0.tar.gz", hash = "sha256:f7417595ead0eb42ed1863ec9b2f839fc740368cd7bbbfc1d0a47bc1ab0aba11"},
]
[[package]]
name = "overrides"
version = "7.7.0"
@ -3876,6 +4130,25 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-oauthlib"
version = "2.0.0"
description = "OAuthlib authentication support for Requests."
optional = false
python-versions = ">=3.4"
groups = ["main"]
files = [
{file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"},
{file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"},
]
[package.dependencies]
oauthlib = ">=3.0.0"
requests = ">=2.0.0"
[package.extras]
rsa = ["oauthlib[signedtoken] (>=3.0.0)"]
[[package]]
name = "rfc3339-validator"
version = "0.1.4"
@ -5050,6 +5323,95 @@ files = [
{file = "websockets-15.0.1.tar.gz", hash = "sha256:82544de02076bafba038ce055ee6412d68da13ab47f0c60cab827346de828dee"},
]
[[package]]
name = "wrapt"
version = "1.17.2"
description = "Module for decorators, wrappers and monkey patching."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "wrapt-1.17.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3d57c572081fed831ad2d26fd430d565b76aa277ed1d30ff4d40670b1c0dd984"},
{file = "wrapt-1.17.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b5e251054542ae57ac7f3fba5d10bfff615b6c2fb09abeb37d2f1463f841ae22"},
{file = "wrapt-1.17.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:80dd7db6a7cb57ffbc279c4394246414ec99537ae81ffd702443335a61dbf3a7"},
{file = "wrapt-1.17.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a6e821770cf99cc586d33833b2ff32faebdbe886bd6322395606cf55153246c"},
{file = "wrapt-1.17.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b60fb58b90c6d63779cb0c0c54eeb38941bae3ecf7a73c764c52c88c2dcb9d72"},
{file = "wrapt-1.17.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b870b5df5b71d8c3359d21be8f0d6c485fa0ebdb6477dda51a1ea54a9b558061"},
{file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:4011d137b9955791f9084749cba9a367c68d50ab8d11d64c50ba1688c9b457f2"},
{file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:1473400e5b2733e58b396a04eb7f35f541e1fb976d0c0724d0223dd607e0f74c"},
{file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:3cedbfa9c940fdad3e6e941db7138e26ce8aad38ab5fe9dcfadfed9db7a54e62"},
{file = "wrapt-1.17.2-cp310-cp310-win32.whl", hash = "sha256:582530701bff1dec6779efa00c516496968edd851fba224fbd86e46cc6b73563"},
{file = "wrapt-1.17.2-cp310-cp310-win_amd64.whl", hash = "sha256:58705da316756681ad3c9c73fd15499aa4d8c69f9fd38dc8a35e06c12468582f"},
{file = "wrapt-1.17.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:ff04ef6eec3eee8a5efef2401495967a916feaa353643defcc03fc74fe213b58"},
{file = "wrapt-1.17.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4db983e7bca53819efdbd64590ee96c9213894272c776966ca6306b73e4affda"},
{file = "wrapt-1.17.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9abc77a4ce4c6f2a3168ff34b1da9b0f311a8f1cfd694ec96b0603dff1c79438"},
{file = "wrapt-1.17.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0b929ac182f5ace000d459c59c2c9c33047e20e935f8e39371fa6e3b85d56f4a"},
{file = "wrapt-1.17.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f09b286faeff3c750a879d336fb6d8713206fc97af3adc14def0cdd349df6000"},
{file = "wrapt-1.17.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1a7ed2d9d039bd41e889f6fb9364554052ca21ce823580f6a07c4ec245c1f5d6"},
{file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:129a150f5c445165ff941fc02ee27df65940fcb8a22a61828b1853c98763a64b"},
{file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1fb5699e4464afe5c7e65fa51d4f99e0b2eadcc176e4aa33600a3df7801d6662"},
{file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:9a2bce789a5ea90e51a02dfcc39e31b7f1e662bc3317979aa7e5538e3a034f72"},
{file = "wrapt-1.17.2-cp311-cp311-win32.whl", hash = "sha256:4afd5814270fdf6380616b321fd31435a462019d834f83c8611a0ce7484c7317"},
{file = "wrapt-1.17.2-cp311-cp311-win_amd64.whl", hash = "sha256:acc130bc0375999da18e3d19e5a86403667ac0c4042a094fefb7eec8ebac7cf3"},
{file = "wrapt-1.17.2-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:d5e2439eecc762cd85e7bd37161d4714aa03a33c5ba884e26c81559817ca0925"},
{file = "wrapt-1.17.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:3fc7cb4c1c744f8c05cd5f9438a3caa6ab94ce8344e952d7c45a8ed59dd88392"},
{file = "wrapt-1.17.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8fdbdb757d5390f7c675e558fd3186d590973244fab0c5fe63d373ade3e99d40"},
{file = "wrapt-1.17.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5bb1d0dbf99411f3d871deb6faa9aabb9d4e744d67dcaaa05399af89d847a91d"},
{file = "wrapt-1.17.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d18a4865f46b8579d44e4fe1e2bcbc6472ad83d98e22a26c963d46e4c125ef0b"},
{file = "wrapt-1.17.2-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc570b5f14a79734437cb7b0500376b6b791153314986074486e0b0fa8d71d98"},
{file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6d9187b01bebc3875bac9b087948a2bccefe464a7d8f627cf6e48b1bbae30f82"},
{file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:9e8659775f1adf02eb1e6f109751268e493c73716ca5761f8acb695e52a756ae"},
{file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e8b2816ebef96d83657b56306152a93909a83f23994f4b30ad4573b00bd11bb9"},
{file = "wrapt-1.17.2-cp312-cp312-win32.whl", hash = "sha256:468090021f391fe0056ad3e807e3d9034e0fd01adcd3bdfba977b6fdf4213ea9"},
{file = "wrapt-1.17.2-cp312-cp312-win_amd64.whl", hash = "sha256:ec89ed91f2fa8e3f52ae53cd3cf640d6feff92ba90d62236a81e4e563ac0e991"},
{file = "wrapt-1.17.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:6ed6ffac43aecfe6d86ec5b74b06a5be33d5bb9243d055141e8cabb12aa08125"},
{file = "wrapt-1.17.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:35621ae4c00e056adb0009f8e86e28eb4a41a4bfa8f9bfa9fca7d343fe94f998"},
{file = "wrapt-1.17.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a604bf7a053f8362d27eb9fefd2097f82600b856d5abe996d623babd067b1ab5"},
{file = "wrapt-1.17.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5cbabee4f083b6b4cd282f5b817a867cf0b1028c54d445b7ec7cfe6505057cf8"},
{file = "wrapt-1.17.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:49703ce2ddc220df165bd2962f8e03b84c89fee2d65e1c24a7defff6f988f4d6"},
{file = "wrapt-1.17.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8112e52c5822fc4253f3901b676c55ddf288614dc7011634e2719718eaa187dc"},
{file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:9fee687dce376205d9a494e9c121e27183b2a3df18037f89d69bd7b35bcf59e2"},
{file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:18983c537e04d11cf027fbb60a1e8dfd5190e2b60cc27bc0808e653e7b218d1b"},
{file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:703919b1633412ab54bcf920ab388735832fdcb9f9a00ae49387f0fe67dad504"},
{file = "wrapt-1.17.2-cp313-cp313-win32.whl", hash = "sha256:abbb9e76177c35d4e8568e58650aa6926040d6a9f6f03435b7a522bf1c487f9a"},
{file = "wrapt-1.17.2-cp313-cp313-win_amd64.whl", hash = "sha256:69606d7bb691b50a4240ce6b22ebb319c1cfb164e5f6569835058196e0f3a845"},
{file = "wrapt-1.17.2-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:4a721d3c943dae44f8e243b380cb645a709ba5bd35d3ad27bc2ed947e9c68192"},
{file = "wrapt-1.17.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:766d8bbefcb9e00c3ac3b000d9acc51f1b399513f44d77dfe0eb026ad7c9a19b"},
{file = "wrapt-1.17.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e496a8ce2c256da1eb98bd15803a79bee00fc351f5dfb9ea82594a3f058309e0"},
{file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:40d615e4fe22f4ad3528448c193b218e077656ca9ccb22ce2cb20db730f8d306"},
{file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a5aaeff38654462bc4b09023918b7f21790efb807f54c000a39d41d69cf552cb"},
{file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a7d15bbd2bc99e92e39f49a04653062ee6085c0e18b3b7512a4f2fe91f2d681"},
{file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:e3890b508a23299083e065f435a492b5435eba6e304a7114d2f919d400888cc6"},
{file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:8c8b293cd65ad716d13d8dd3624e42e5a19cc2a2f1acc74b30c2c13f15cb61a6"},
{file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:4c82b8785d98cdd9fed4cac84d765d234ed3251bd6afe34cb7ac523cb93e8b4f"},
{file = "wrapt-1.17.2-cp313-cp313t-win32.whl", hash = "sha256:13e6afb7fe71fe7485a4550a8844cc9ffbe263c0f1a1eea569bc7091d4898555"},
{file = "wrapt-1.17.2-cp313-cp313t-win_amd64.whl", hash = "sha256:eaf675418ed6b3b31c7a989fd007fa7c3be66ce14e5c3b27336383604c9da85c"},
{file = "wrapt-1.17.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:5c803c401ea1c1c18de70a06a6f79fcc9c5acfc79133e9869e730ad7f8ad8ef9"},
{file = "wrapt-1.17.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f917c1180fdb8623c2b75a99192f4025e412597c50b2ac870f156de8fb101119"},
{file = "wrapt-1.17.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:ecc840861360ba9d176d413a5489b9a0aff6d6303d7e733e2c4623cfa26904a6"},
{file = "wrapt-1.17.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bb87745b2e6dc56361bfde481d5a378dc314b252a98d7dd19a651a3fa58f24a9"},
{file = "wrapt-1.17.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:58455b79ec2661c3600e65c0a716955adc2410f7383755d537584b0de41b1d8a"},
{file = "wrapt-1.17.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b4e42a40a5e164cbfdb7b386c966a588b1047558a990981ace551ed7e12ca9c2"},
{file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:91bd7d1773e64019f9288b7a5101f3ae50d3d8e6b1de7edee9c2ccc1d32f0c0a"},
{file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:bb90fb8bda722a1b9d48ac1e6c38f923ea757b3baf8ebd0c82e09c5c1a0e7a04"},
{file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:08e7ce672e35efa54c5024936e559469436f8b8096253404faeb54d2a878416f"},
{file = "wrapt-1.17.2-cp38-cp38-win32.whl", hash = "sha256:410a92fefd2e0e10d26210e1dfb4a876ddaf8439ef60d6434f21ef8d87efc5b7"},
{file = "wrapt-1.17.2-cp38-cp38-win_amd64.whl", hash = "sha256:95c658736ec15602da0ed73f312d410117723914a5c91a14ee4cdd72f1d790b3"},
{file = "wrapt-1.17.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:99039fa9e6306880572915728d7f6c24a86ec57b0a83f6b2491e1d8ab0235b9a"},
{file = "wrapt-1.17.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2696993ee1eebd20b8e4ee4356483c4cb696066ddc24bd70bcbb80fa56ff9061"},
{file = "wrapt-1.17.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:612dff5db80beef9e649c6d803a8d50c409082f1fedc9dbcdfde2983b2025b82"},
{file = "wrapt-1.17.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62c2caa1585c82b3f7a7ab56afef7b3602021d6da34fbc1cf234ff139fed3cd9"},
{file = "wrapt-1.17.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c958bcfd59bacc2d0249dcfe575e71da54f9dcf4a8bdf89c4cb9a68a1170d73f"},
{file = "wrapt-1.17.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fc78a84e2dfbc27afe4b2bd7c80c8db9bca75cc5b85df52bfe634596a1da846b"},
{file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:ba0f0eb61ef00ea10e00eb53a9129501f52385c44853dbd6c4ad3f403603083f"},
{file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:1e1fe0e6ab7775fd842bc39e86f6dcfc4507ab0ffe206093e76d61cde37225c8"},
{file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c86563182421896d73858e08e1db93afdd2b947a70064b813d515d66549e15f9"},
{file = "wrapt-1.17.2-cp39-cp39-win32.whl", hash = "sha256:f393cda562f79828f38a819f4788641ac7c4085f30f1ce1a68672baa686482bb"},
{file = "wrapt-1.17.2-cp39-cp39-win_amd64.whl", hash = "sha256:36ccae62f64235cf8ddb682073a60519426fdd4725524ae38874adf72b5f2aeb"},
{file = "wrapt-1.17.2-py3-none-any.whl", hash = "sha256:b18f2d1533a71f069c7f82d524a52599053d4c7166e9dd374ae2136b7f40f7c8"},
{file = "wrapt-1.17.2.tar.gz", hash = "sha256:41388e9d4d1522446fe79d3213196bd9e3b301a336965b9e27ca2788ebd122f3"},
]
[[package]]
name = "yarl"
version = "1.20.1"
@ -5169,7 +5531,27 @@ idna = ">=2.0"
multidict = ">=4.0"
propcache = ">=0.2.1"
[[package]]
name = "zipp"
version = "3.23.0"
description = "Backport of pathlib-compatible object wrapper for zip files"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"},
{file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"},
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"]
type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">3.11"
content-hash = "d8fefd01d3b4a213cf389c63829e901ce921eb931cb7034af6c869a47a557a59"
content-hash = "9356591426eaabc61359774a41a9f508f597fe844a023fb3b2291ba779cb4674"

View file

@ -32,6 +32,12 @@ mysqlclient = "^2.2.7"
celery = "^5.5.3"
redis = "^6.2.0"
watchdog = "^6.0.0"
apprise = "^1.9.3"
opentelemetry-api = "^1.36.0"
opentelemetry-sdk = "^1.36.0"
opentelemetry-exporter-prometheus = "^0.57b0"
opentelemetry-instrumentation-fastapi = "^0.57b0"
opentelemetry-instrumentation-sqlalchemy = "^0.57b0"
[tool.poetry.group.dev.dependencies]
ipdb = "^0.13.13"

View file

@ -77,7 +77,7 @@ class ListingRepository:
if query_parameters.furnish_types:
query = query.where(model.furnish_type.in_(query_parameters.furnish_types))
if (
isinstance(model, BuyListing)
isinstance(model, RentListing)
and query_parameters.let_date_available_from is not None
):
query = query.where(
@ -120,9 +120,6 @@ class ListingRepository:
try:
model_listing = await self._get_concrete_listing(listing)
except Exception as e: # WHY SO MANY ERORRS??
import ipdb
ipdb.set_trace()
# If for whatever reason we cannot add listing, ignore and retry
print(f"Error converting listing {listing.identifier}: {e}")
failed_to_upsert.append(listing)

View file

@ -23,7 +23,7 @@ case "$ENV_MODE" in
prod)
echo "🚀 Running in PRODUCTION mode"
alembic upgrade head
celery -A celery_app worker &
celery -A celery_app worker --beat &
CELERY_PID=$!
;;
*)

View file

@ -1,17 +1,17 @@
import asyncio
import importlib
import itertools
import logging
from pathlib import Path
import time
from typing import Any
from celery import Celery, Task
from celery import Task
from celery_app import app
from models.listing import Listing, QueryParameters
from listing_processor import ListingProcessor
from models.listing import Listing, ListingType, QueryParameters
from rec.districts import get_districts
from rec.query import listing_query
from repositories.listing_repository import ListingRepository
from database import engine
from tasks.task_state import TaskStatus
dump_listings_module = importlib.import_module("1_dump_listings")
dump_images_module = importlib.import_module("3_dump_images")
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
@ -21,25 +21,189 @@ logger = logging.getLogger("uvicorn.error")
@app.task(bind=True, pydantic=True)
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
asyncio.run(dump_listings_full(self, parsed_parameters))
return {"progress": 1}
self.update_state(state="Starting...", meta={"progress": 0})
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
return {"progress": 0}
async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Listing]:
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
await dump_listings_full(task=Task(), parameters=parsed_parameters)
return {"progress": 0}
async def dump_listings_full(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans"""
self.update_state(state="FETCHING_LISTINGS", meta={"progress": 0.1})
repository = ListingRepository(engine)
new_listings = await dump_listings_module.dump_listings(parameters, repository)
self.update_state(state="FETCHING_FLOORPLANS", meta={"progress": 0.3})
logger.debug(f"Upserted {len(new_listings)} new listings")
logger.debug("Starting to fetch floorplans")
await dump_images_module.dump_images(repository)
self.update_state(state="RUNNING_OCR_ON_FLOORPLANS", meta={"progress": 0.6})
logger.debug("Completed fetching floorplans")
logger.debug("Starting floorplan detection")
await detect_floorplan_module.detect_floorplan(repository)
logger.debug("Completed floorplan detection")
# refresh listings
listings = await repository.get_listings(parameters) # this can be better
new_listings = [l for l in listings if l.id in new_listings]
return new_listings
task.update_state(state="Identifying missing listings", meta={"progress": 0})
ids_to_process = await get_ids_to_process(
parameters=parameters, repository=repository, task=task
)
logger.info(f"Found {len(ids_to_process)} listings to process")
listing_processor = ListingProcessor(repository)
logger.info(f"Starting processing {len(ids_to_process)} listings")
return await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=ids_to_process
)
async def dump_listings_and_monitor(
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
) -> list[Listing]:
task_progress = {missing_id: 0 for missing_id in missing_ids}
async def process(missing_id: int) -> Listing | None:
listing = await listing_processor.process_listing(missing_id)
task_progress[missing_id] = 1
return listing
async def monitor() -> None:
while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2)
logger.error(
f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})"
)
task.update_state(
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
meta={"progress": progress_ratio},
)
await asyncio.sleep(1)
processed_listings = await asyncio.gather(
*[process(id) for id in missing_ids], *[monitor()]
)
filtered_listings = [l for l in processed_listings if l is not None]
return filtered_listings
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
3600 * 24, # Daily updates
dump_listings_task.s(
QueryParameters(
listing_type=ListingType.RENT,
min_bedrooms=2,
max_bedrooms=3,
min_price=2000,
max_price=4000,
).model_dump_json()
),
name="Daily dump of interesting rent listings",
)
async def get_ids_to_process(
*,
parameters: QueryParameters,
repository: ListingRepository,
task: Task,
) -> set[int]:
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
districts = await get_valid_districts_to_scrape(parameters.district_names)
task.update_state(state="Fetching listings to scrape", meta={"progress": 0})
json_responses: list[list[dict[str, Any]]] = await asyncio.gather(
*[
_fetch_listings_with_semaphore(
task=task, semaphore=semaphore, parameters=parameters, district=district
)
for district in districts.keys()
],
)
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
logger.debug(f"Total listings fetched {len(json_responses_flat)}")
identifiers: set[int] = set()
for response_json in json_responses_flat:
if response_json == {}:
continue
if response_json["totalAvailableResults"] == 0:
continue
for property in response_json["properties"]:
identifier = property["identifier"]
identifiers.add(identifier)
# if listing is already in db, do not fetch details again
all_listing_ids = {l.id for l in await repository.get_listings()}
all_ids = all_listing_ids.union(identifiers)
return all_ids
async def get_valid_districts_to_scrape(
district_names: set[str] | None,
) -> dict[str, str]:
if district_names:
districts = {
district: locid
for district, locid in get_districts().items()
if district in district_names
}
else:
districts = get_districts()
return districts
async def _fetch_listings_with_semaphore(
*,
task: Task,
semaphore: asyncio.Semaphore,
parameters: QueryParameters,
district: str,
) -> list[dict[str, Any]]:
result = []
# split the price in N bands to avoid the 1.5k capping by rightmove
# basically instead of 1 query with price between 1k and 5k that is capped at 1500 results
# we do 10 queries each with an increment in price range so we send more queries but each
# has a smaller chance of returning more than 1.5k results
number_of_steps = 10
price_step = parameters.max_price // number_of_steps
for step in range(number_of_steps):
task.update_state(
state=f"Fetching listings ({step} out of {number_of_steps})",
meta={"progress": step / number_of_steps},
)
min_price = step * price_step
max_price = (step + 1) * price_step
logger.debug(
f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}"
)
for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1):
for page_id in range(
1,
3, # seems like all searches stop at 1500 entries (page_id * page_size)
):
logger.debug(f"Processing {page_id=} for {district=}")
async with semaphore:
try:
listing_query_result = await listing_query(
page=page_id,
channel=parameters.listing_type,
# min_bedrooms=parameters.min_bedrooms,
# max_bedrooms=parameters.max_bedrooms,
min_bedrooms=num_bedrooms,
max_bedrooms=num_bedrooms,
radius=parameters.radius,
min_price=min_price,
max_price=max_price,
district=district,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
except Exception as e:
if "GENERIC_ERROR" in str(e): # Too big page id
logger.debug(f"Max page id for {district=}: {page_id-1}")
break
raise e
result.append(listing_query_result)
return result