From 762408e0543fb95d0594592220e69f006640a245 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 25 Jul 2025 21:32:06 +0000 Subject: [PATCH 01/10] add apprise and send notification when refreshing listings --- crawler/api/app.py | 2 + crawler/data_access.py | 2 +- crawler/notifications.py | 27 ++++++++ crawler/poetry.lock | 74 +++++++++++++++++++++- crawler/pyproject.toml | 1 + crawler/repositories/listing_repository.py | 5 +- 6 files changed, 105 insertions(+), 6 deletions(-) create mode 100644 crawler/notifications.py diff --git a/crawler/api/app.py b/crawler/api/app.py index 73ba146..eec5d9a 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -15,6 +15,7 @@ from dotenv import load_dotenv from fastapi import Depends, FastAPI, HTTPException, Query from api.auth import User from models.listing import QueryParameters +from notifications import send_notification from rec import districts from redis_repository import RedisRepository from repositories.listing_repository import ListingRepository @@ -82,6 +83,7 @@ async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: + await send_notification(f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}") # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( diff --git a/crawler/data_access.py b/crawler/data_access.py index 2e6fa54..3b9bf68 100644 --- a/crawler/data_access.py +++ b/crawler/data_access.py @@ -110,7 +110,7 @@ class Listing: # some places list pw in price and others pcm price = max( - self._listing_object["price"], + self._listing_object["price"] or 0, self._listing_object.get("monthlyRent", 0) or 0, ) self.append_price_history(price) diff --git a/crawler/notifications.py b/crawler/notifications.py new file mode 100644 index 0000000..a5eb656 --- /dev/null +++ b/crawler/notifications.py @@ -0,0 +1,27 @@ +from abc import abstractmethod +from enum import StrEnum +import apprise +from functools import lru_cache + + +class Surface: + @abstractmethod + def connection_string(self) -> str | None: + ... + +class Slack(Surface): + def connection_string(self) -> str | None: + return "https://hooks.slack.com/services/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" + + +@lru_cache(maxsize=None) +def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise: + surfaces = surfaces or [Slack()] + obj = apprise.Apprise() + for surface in surfaces: + obj.add(surface.connection_string()) + return obj + +async def send_notification( body: str, title: str='') -> bool: + notifier = get_notifier() + return await notifier.async_notify(body=body, title=title) diff --git a/crawler/poetry.lock b/crawler/poetry.lock index 78834c6..ef1119a 100644 --- a/crawler/poetry.lock +++ b/crawler/poetry.lock @@ -217,6 +217,26 @@ files = [ {file = "appnope-0.1.4.tar.gz", hash = "sha256:1de3860566df9caf38f01f86f65e0e13e379af54f9e4bee1e66b48f2efffd1ee"}, ] +[[package]] +name = "apprise" +version = "1.9.3" +description = "Push Notifications that work with just about every platform!" +optional = false +python-versions = ">=3.6" +groups = ["main"] +files = [ + {file = "apprise-1.9.3-py3-none-any.whl", hash = "sha256:e9b5abb73244c21a30ee493860f8d4ae80665d225b1b436179d48db4f6fc5b9e"}, + {file = "apprise-1.9.3.tar.gz", hash = "sha256:f583667ea35b8899cd46318c6cb26f0faf6a4605b119174c2523a012590c65a6"}, +] + +[package.dependencies] +certifi = "*" +click = ">=5.0" +markdown = "*" +PyYAML = "*" +requests = "*" +requests-oauthlib = "*" + [[package]] name = "argon2-cffi" version = "25.1.0" @@ -2263,6 +2283,22 @@ babel = ["Babel"] lingua = ["lingua"] testing = ["pytest"] +[[package]] +name = "markdown" +version = "3.8.2" +description = "Python implementation of John Gruber's Markdown." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "markdown-3.8.2-py3-none-any.whl", hash = "sha256:5c83764dbd4e00bdd94d85a19b8d55ccca20fe35b2e678a1422b380324dd5f24"}, + {file = "markdown-3.8.2.tar.gz", hash = "sha256:247b9a70dd12e27f67431ce62523e675b866d254f900c4fe75ce3dda62237c45"}, +] + +[package.extras] +docs = ["mdx_gh_links (>=0.2)", "mkdocs (>=1.6)", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-nature (>=0.6)", "mkdocs-section-index", "mkdocstrings[python]"] +testing = ["coverage", "pyyaml"] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -2752,6 +2788,23 @@ files = [ {file = "numpy-1.26.4.tar.gz", hash = "sha256:2a02aba9ed12e4ac4eb3ea9421c420301a0c6460d9830d74a9df87efa4912010"}, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +description = "A generic, spec-compliant, thorough implementation of the OAuth request-signing logic" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1"}, + {file = "oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9"}, +] + +[package.extras] +rsa = ["cryptography (>=3.0.0)"] +signals = ["blinker (>=1.4.0)"] +signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] + [[package]] name = "opencv-python" version = "4.11.0.86" @@ -3876,6 +3929,25 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +description = "OAuthlib authentication support for Requests." +optional = false +python-versions = ">=3.4" +groups = ["main"] +files = [ + {file = "requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, + {file = "requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, +] + +[package.dependencies] +oauthlib = ">=3.0.0" +requests = ">=2.0.0" + +[package.extras] +rsa = ["oauthlib[signedtoken] (>=3.0.0)"] + [[package]] name = "rfc3339-validator" version = "0.1.4" @@ -5172,4 +5244,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = ">3.11" -content-hash = "d8fefd01d3b4a213cf389c63829e901ce921eb931cb7034af6c869a47a557a59" +content-hash = "110fae166c8a5b2f9c178f822097ee5f939d22f04445bab6ca945612e08a4ed8" diff --git a/crawler/pyproject.toml b/crawler/pyproject.toml index e7e7961..92e1a7f 100644 --- a/crawler/pyproject.toml +++ b/crawler/pyproject.toml @@ -32,6 +32,7 @@ mysqlclient = "^2.2.7" celery = "^5.5.3" redis = "^6.2.0" watchdog = "^6.0.0" +apprise = "^1.9.3" [tool.poetry.group.dev.dependencies] ipdb = "^0.13.13" diff --git a/crawler/repositories/listing_repository.py b/crawler/repositories/listing_repository.py index 61a5818..69c1219 100644 --- a/crawler/repositories/listing_repository.py +++ b/crawler/repositories/listing_repository.py @@ -77,7 +77,7 @@ class ListingRepository: if query_parameters.furnish_types: query = query.where(model.furnish_type.in_(query_parameters.furnish_types)) if ( - isinstance(model, BuyListing) + isinstance(model, RentListing) and query_parameters.let_date_available_from is not None ): query = query.where( @@ -120,9 +120,6 @@ class ListingRepository: try: model_listing = await self._get_concrete_listing(listing) except Exception as e: # WHY SO MANY ERORRS?? - import ipdb - - ipdb.set_trace() # If for whatever reason we cannot add listing, ignore and retry print(f"Error converting listing {listing.identifier}: {e}") failed_to_upsert.append(listing) From 272d54d0146d0207193a32d52ae05ed816abdf7d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 25 Jul 2025 22:14:45 +0000 Subject: [PATCH 02/10] add daily scrape of interesting rent listings --- crawler/start.sh | 2 +- crawler/tasks/listing_tasks.py | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/crawler/start.sh b/crawler/start.sh index 73d9659..ca7b779 100755 --- a/crawler/start.sh +++ b/crawler/start.sh @@ -23,7 +23,7 @@ case "$ENV_MODE" in prod) echo "🚀 Running in PRODUCTION mode" alembic upgrade head - celery -A celery_app worker & + celery -A celery_app worker --beat & CELERY_PID=$! ;; *) diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index 59e99f8..205fa64 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -2,11 +2,10 @@ import asyncio import importlib import logging from pathlib import Path -import time from typing import Any from celery import Celery, Task from celery_app import app -from models.listing import Listing, QueryParameters +from models.listing import FurnishType, Listing, ListingType, QueryParameters from repositories.listing_repository import ListingRepository from database import engine from tasks.task_state import TaskStatus @@ -43,3 +42,20 @@ async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Li listings = await repository.get_listings(parameters) # this can be better new_listings = [l for l in listings if l.id in new_listings] return new_listings + + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + sender.add_periodic_task( + 3600 * 24, # Daily updates + name='Daily dump of interesting rent listings', + dump_listings_task.s( + QueryParameters( + listing_type=ListingType.RENT, + min_bedrooms=2, + max_bedrooms=3, + min_price=2000, + max_price=4000, + ).model_dump_json() + ), + ) From 206471cee881a3145005eb8bfbb3a1994616a12a Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 26 Jul 2025 10:38:51 +0000 Subject: [PATCH 03/10] fix argument error in tasks --- crawler/tasks/listing_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index 205fa64..f31a0dd 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -48,7 +48,6 @@ async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Li def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task( 3600 * 24, # Daily updates - name='Daily dump of interesting rent listings', dump_listings_task.s( QueryParameters( listing_type=ListingType.RENT, @@ -58,4 +57,5 @@ def setup_periodic_tasks(sender, **kwargs): max_price=4000, ).model_dump_json() ), + name='Daily dump of interesting rent listings', ) From 4e7734d32718ce9ffd1a4855cb2eea7d2c87d72e Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 26 Jul 2025 13:06:28 +0000 Subject: [PATCH 04/10] add healtcheck api endpoint --- crawler/api/app.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crawler/api/app.py b/crawler/api/app.py index eec5d9a..fb48a4a 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -58,6 +58,11 @@ app.add_middleware( ) +@app.get("/api/status") +async def get_status(): + return {"status": "OK"} + + @app.get("/api/listing") async def get_listing(user: Annotated[User, Depends(get_current_user)]): repository = ListingRepository(engine) @@ -83,7 +88,9 @@ async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: - await send_notification(f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}") + await send_notification( + f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}" + ) # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( From 42ed20b83394f80dc715c56b4aac051a74be10f2 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 26 Jul 2025 13:15:21 +0000 Subject: [PATCH 05/10] read slack notification webhook url from env --- crawler/notifications.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crawler/notifications.py b/crawler/notifications.py index a5eb656..735fd7e 100644 --- a/crawler/notifications.py +++ b/crawler/notifications.py @@ -2,16 +2,17 @@ from abc import abstractmethod from enum import StrEnum import apprise from functools import lru_cache +import os class Surface: @abstractmethod - def connection_string(self) -> str | None: - ... + def connection_string(self) -> str | None: ... + class Slack(Surface): def connection_string(self) -> str | None: - return "https://hooks.slack.com/services/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" + return os.environ.get("SLACK_WEBHOOK_URL") @lru_cache(maxsize=None) @@ -19,9 +20,11 @@ def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise: surfaces = surfaces or [Slack()] obj = apprise.Apprise() for surface in surfaces: - obj.add(surface.connection_string()) + if conn := surface.connection_string(): + obj.add(conn) return obj -async def send_notification( body: str, title: str='') -> bool: + +async def send_notification(body: str, title: str = "") -> bool: notifier = get_notifier() return await notifier.async_notify(body=body, title=title) From 4fa09e31c808616869897fa6e649044f3ab59a41 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 11:11:04 +0000 Subject: [PATCH 06/10] make property image clickable --- crawler/frontend/src/components/Map.tsx | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crawler/frontend/src/components/Map.tsx b/crawler/frontend/src/components/Map.tsx index db966b6..7471f97 100644 --- a/crawler/frontend/src/components/Map.tsx +++ b/crawler/frontend/src/components/Map.tsx @@ -256,7 +256,9 @@ export function Map( const lastSeenDays = Math.round((new Date() - new Date(lastSeenStr)) / (1000 * 60 * 60 * 24)); return
- + + +
Available from: From 91a0436f7fd1cdf35ffd3a65552da49042a69982 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 18:33:39 +0000 Subject: [PATCH 07/10] migrate processing to a pipeline approach where each listing is processed in a pipeline in parallel and status reported back to track progress --- crawler/3_dump_images.py | 2 +- crawler/api/app.py | 2 + crawler/listing_processor.py | 184 +++++++++++++++++++++++++++++++++ crawler/main.py | 1 + crawler/notifications.py | 2 +- crawler/tasks/listing_tasks.py | 182 +++++++++++++++++++++++++++----- 6 files changed, 347 insertions(+), 26 deletions(-) create mode 100644 crawler/listing_processor.py diff --git a/crawler/3_dump_images.py b/crawler/3_dump_images.py index f54f24f..1c8f3f5 100644 --- a/crawler/3_dump_images.py +++ b/crawler/3_dump_images.py @@ -27,7 +27,7 @@ async def dump_images( @retry(wait=wait_random(min=1, max=2), stop=stop_after_attempt(3)) async def dump_images_for_listing(listing: Listing, base_path: Path) -> Listing | None: - all_floorplans = listing.additional_info["property"]["floorplans"] + all_floorplans = listing.additional_info.get("property", {}).get("floorplans", []) for floorplan in all_floorplans: url = floorplan["url"] picname = url.split("/")[-1] diff --git a/crawler/api/app.py b/crawler/api/app.py index fb48a4a..6f6cffd 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -91,6 +91,8 @@ async def refresh_listings( await send_notification( f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}" ) + # await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync + # return {} # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py new file mode 100644 index 0000000..6955050 --- /dev/null +++ b/crawler/listing_processor.py @@ -0,0 +1,184 @@ +from __future__ import annotations +from abc import abstractmethod +import asyncio +from datetime import datetime +import logging +import multiprocessing +from pathlib import Path +import aiohttp +from models.listing import FurnishType, Listing, ListingSite, RentListing +from rec import floorplan +from rec.query import detail_query +from repositories.listing_repository import ListingRepository + +logger = logging.getLogger("uvicorn.error") + + +class ListingProcessor: + semaphore: asyncio.Semaphore + process_steps: list[Step] + + def __init__(self, listing_repository: ListingRepository): + self.semaphore = asyncio.Semaphore(20) + # Register new processing steps here + # Order is important + self.process_steps = [ + FetchListingDetailsStep(listing_repository), + FetchImagesStep(listing_repository), + DetectFloorplanStep(listing_repository), + ] + + async def process_listing(self, listing_id: int) -> Listing | None: + listing = None + for step in self.process_steps: + if await step.needs_processing(listing_id): + async with self.semaphore: + listing = await step.process(listing_id) + return listing + + async def listing_exists(self, listing_id: int) -> bool: ... + + +class Step: + listing_repository: ListingRepository + + def __init__(self, listing_repository: ListingRepository): + self.listing_repository = listing_repository + + @abstractmethod + async def process(self, listing_id: int) -> Listing: ... + + @abstractmethod + async def needs_processing(self, listing_id: int) -> bool: + return True + + +class FetchListingDetailsStep(Step): + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Fetching details for {listing_id=}") + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + now = datetime.now() + if len(existing_listings) > 0: + # listing exists, do not refresh + return existing_listings[0] + listing_details = await detail_query(listing_id) + + furnish_type_str = listing_details["property"].get("letFurnishType", "unknown") + if furnish_type_str is None: + furnish_type_str = "unknown" + elif "landlord" in furnish_type_str.lower(): + furnish_type_str = "ask landlord" + else: + furnish_type_str = furnish_type_str.lower() + furnish_type = FurnishType(furnish_type_str) + + available_from: datetime | None = None + available_from_str: str | None = listing_details["property"]["letDateAvailable"] + if available_from_str is None: + available_from = None + elif available_from_str.lower() == "now": + available_from = datetime.now() + else: + try: + available_from = datetime.strptime(available_from_str, "%d/%m/%Y") + except ValueError: + # If the date format is not as expected, return None + available_from = None + + photos = listing_details["property"]["photos"] + # listing = Listing( + listing = RentListing( # TODO: should pick based on price? + id=listing_id, + price=listing_details["property"]["price"], + number_of_bedrooms=listing_details["property"]["bedrooms"], + square_meters=None, # populated later + agency=listing_details["property"]["branch"]["brandName"], + council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][ + 0 + ]["value"], + longtitude=listing_details["property"]["longitude"], + latitude=listing_details["property"]["latitude"], + price_history_json="{}", # TODO: should upsert from existing + listing_site=ListingSite.RIGHTMOVE, + last_seen=now, + photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, + furnish_type=furnish_type, + available_from=available_from, + additional_info=listing_details, + ) + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed fetching details for {listing_id=}") + # TODO: dump to filesystem + return listing + + +class FetchImagesStep(Step): + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Fetching images for {listing_id=}") + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if len(existing_listings) == 0: + raise ValueError(f"Listing {listing_id} not found") + listing = existing_listings[0] + + base_path = Path("data/rs/") + all_floorplans = listing.additional_info.get("property", {}).get( + "floorplans", [] + ) + for floorplan in all_floorplans: + url = floorplan["url"] + picname = url.split("/")[-1] + floorplan_path = Path(base_path, str(listing.id), "floorplans", picname) + if floorplan_path.exists(): + continue + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 404: + return listing + if response.status != 200: + raise Exception(f"Error for {url}: {response.status}") + floorplan_path.parent.mkdir(parents=True, exist_ok=True) + with open(floorplan_path, "wb") as f: + f.write(await response.read()) + listing.floorplan_image_paths.append(str(floorplan_path)) + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed fetching images for {listing_id=}") + return listing + + +class DetectFloorplanStep(Step): + ocr_semaphore: asyncio.Semaphore + + def __init__(self, listing_repository: ListingRepository): + super().__init__(listing_repository) + self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4) + + async def needs_processing(self, listing_id: int) -> bool: + listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + if len(listings) == 0: + return False + return listings[0].square_meters is None + + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Running floorplan detection for {listing_id=}") + listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + if len(listings) == 0: + raise ValueError(f"Listing {listing_id} does not exist") + listing = listings[0] + sqms = [] + for floorplan_path in listing.floorplan_image_paths: + async with self.ocr_semaphore: + estimated_sqm, _ = await asyncio.to_thread( + floorplan.calculate_ocr, floorplan_path + ) + if estimated_sqm is not None: + sqms.append(estimated_sqm) + max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 + # if max_sqm is not None: + listing.square_meters = max_sqm + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed running floorplan detection for {listing_id=}") + return listing diff --git a/crawler/main.py b/crawler/main.py index 1135278..fbe10ea 100644 --- a/crawler/main.py +++ b/crawler/main.py @@ -6,6 +6,7 @@ import pathlib import click import importlib +import listing_processor from models.listing import FurnishType, ListingType, QueryParameters from rec.districts import get_districts from data_access import Listing diff --git a/crawler/notifications.py b/crawler/notifications.py index 735fd7e..50c7ee3 100644 --- a/crawler/notifications.py +++ b/crawler/notifications.py @@ -17,7 +17,7 @@ class Slack(Surface): @lru_cache(maxsize=None) def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise: - surfaces = surfaces or [Slack()] + surfaces = surfaces or list[Surface]([Slack()]) obj = apprise.Apprise() for surface in surfaces: if conn := surface.connection_string(): diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index f31a0dd..a64b5bf 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -1,16 +1,17 @@ import asyncio import importlib +import itertools import logging -from pathlib import Path from typing import Any -from celery import Celery, Task +from celery import Task from celery_app import app -from models.listing import FurnishType, Listing, ListingType, QueryParameters +from listing_processor import ListingProcessor +from models.listing import Listing, ListingType, QueryParameters +from rec.districts import get_districts +from rec.query import listing_query from repositories.listing_repository import ListingRepository from database import engine -from tasks.task_state import TaskStatus -dump_listings_module = importlib.import_module("1_dump_listings") dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") @@ -20,28 +21,61 @@ logger = logging.getLogger("uvicorn.error") @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: parsed_parameters = QueryParameters.model_validate_json(parameters_json) - asyncio.run(dump_listings_full(self, parsed_parameters)) - return {"progress": 1} + asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) + self.update_state(state="Starting...", meta={"progress": 0}) + return {"progress": 0} -async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Listing]: +async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: + parsed_parameters = QueryParameters.model_validate_json(parameters_json) + await dump_listings_full(task=Task(), parameters=parsed_parameters) + return {"progress": 0} + + +async def dump_listings_full( + *, task: Task, parameters: QueryParameters +) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" - self.update_state(state="FETCHING_LISTINGS", meta={"progress": 0.1}) repository = ListingRepository(engine) - new_listings = await dump_listings_module.dump_listings(parameters, repository) - self.update_state(state="FETCHING_FLOORPLANS", meta={"progress": 0.3}) - logger.debug(f"Upserted {len(new_listings)} new listings") - logger.debug("Starting to fetch floorplans") - await dump_images_module.dump_images(repository) - self.update_state(state="RUNNING_OCR_ON_FLOORPLANS", meta={"progress": 0.6}) - logger.debug("Completed fetching floorplans") - logger.debug("Starting floorplan detection") - await detect_floorplan_module.detect_floorplan(repository) - logger.debug("Completed floorplan detection") - # refresh listings - listings = await repository.get_listings(parameters) # this can be better - new_listings = [l for l in listings if l.id in new_listings] - return new_listings + + missing_ids = await get_missing_listing_ids(parameters, repository) + logger.info(f"Found {len(missing_ids)} missing listings") + + listing_processor = ListingProcessor(repository) + logger.info(f"Starting processing {len(missing_ids)} new listings") + return await dump_listings_and_monitor( + task=task, listing_processor=listing_processor, missing_ids=missing_ids + ) + + +async def dump_listings_and_monitor( + *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] +) -> list[Listing]: + task_progress = {missing_id: 0 for missing_id in missing_ids} + + async def process(missing_id: int) -> Listing | None: + listing = await listing_processor.process_listing(missing_id) + task_progress[missing_id] = 1 + return listing + + async def monitor() -> None: + while (progress := sum(task_progress.values())) < len(missing_ids): + progress_ratio = progress / len(missing_ids) + logger.error( + f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" + ) + task.update_state( + state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", + meta={"progress": progress_ratio}, + ) + await asyncio.sleep(1) + + processed_listings = await asyncio.gather( + *[process(id) for id in missing_ids], *[monitor()] + ) + filtered_listings = [l for l in processed_listings if l is not None] + + return filtered_listings @app.on_after_finalize.connect @@ -57,5 +91,105 @@ def setup_periodic_tasks(sender, **kwargs): max_price=4000, ).model_dump_json() ), - name='Daily dump of interesting rent listings', + name="Daily dump of interesting rent listings", ) + + +async def get_missing_listing_ids( + parameters: QueryParameters, + repository: ListingRepository, +) -> set[int]: + semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections + districts = await get_valid_districts_to_scrape(parameters.district_names) + json_responses: list[list[dict[str, Any]]] = await asyncio.gather( + *[ + _fetch_listings_with_semaphore(semaphore, parameters, district) + for district in districts.keys() + ], + ) + json_responses_flat = list(itertools.chain.from_iterable(json_responses)) + logger.debug(f"Total listings fetched {len(json_responses_flat)}") + + identifiers: set[int] = set() + for response_json in json_responses_flat: + if response_json == {}: + continue + if response_json["totalAvailableResults"] == 0: + continue + for property in response_json["properties"]: + identifier = property["identifier"] + identifiers.add(identifier) + + # if listing is already in db, do not fetch details again + all_listing_ids = {l.id for l in await repository.get_listings()} + missing_ids = all_listing_ids - identifiers + return missing_ids + + +async def get_valid_districts_to_scrape( + district_names: set[str] | None, +) -> dict[str, str]: + if district_names: + districts = { + district: locid + for district, locid in get_districts().items() + if district in district_names + } + else: + districts = get_districts() + return districts + + +async def _fetch_listings_with_semaphore( + semaphore: asyncio.Semaphore, + parameters: QueryParameters, + district: str, +) -> list[dict[str, Any]]: + result = [] + # split the price in N bands to avoid the 1.5k capping by rightmove + # basically instead of 1 query with price between 1k and 5k that is capped at 1500 results + # we do 10 queries each with an increment in price range so we send more queries but each + # has a smaller chance of returning more than 1.5k results + + number_of_steps = 1 + price_step = parameters.max_price // number_of_steps + + for step in range(number_of_steps): + min_price = step * price_step + max_price = (step + 1) * price_step + logger.debug( + f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}" + ) + + for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1): + for page_id in range( + 1, + 3, # seems like all searches stop at 1500 entries (page_id * page_size) + ): + logger.debug(f"Processing {page_id=} for {district=}") + + async with semaphore: + try: + listing_query_result = await listing_query( + page=page_id, + channel=parameters.listing_type, + # min_bedrooms=parameters.min_bedrooms, + # max_bedrooms=parameters.max_bedrooms, + min_bedrooms=num_bedrooms, + max_bedrooms=num_bedrooms, + radius=parameters.radius, + min_price=min_price, + max_price=max_price, + district=district, + page_size=parameters.page_size, + max_days_since_added=parameters.max_days_since_added, + furnish_types=parameters.furnish_types or [], + ) + + except Exception as e: + if "GENERIC_ERROR" in str(e): # Too big page id + logger.debug(f"Max page id for {district=}: {page_id-1}") + break + raise e + result.append(listing_query_result) + return result From 87efe0694c6032815a633ea74d3a55d7195954ed Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 18:47:09 +0000 Subject: [PATCH 08/10] format progress to 2 digits and add status updates before starting --- crawler/tasks/listing_tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index a64b5bf..ecf3626 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -21,8 +21,8 @@ logger = logging.getLogger("uvicorn.error") @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: parsed_parameters = QueryParameters.model_validate_json(parameters_json) - asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) self.update_state(state="Starting...", meta={"progress": 0}) + asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) return {"progress": 0} @@ -38,6 +38,7 @@ async def dump_listings_full( """Fetches all listings, images as well as detects floorplans""" repository = ListingRepository(engine) + task.update_state(state="Identifying missing listings", meta={"progress": 0}) missing_ids = await get_missing_listing_ids(parameters, repository) logger.info(f"Found {len(missing_ids)} missing listings") @@ -60,7 +61,7 @@ async def dump_listings_and_monitor( async def monitor() -> None: while (progress := sum(task_progress.values())) < len(missing_ids): - progress_ratio = progress / len(missing_ids) + progress_ratio = round(progress / len(missing_ids), 2) logger.error( f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" ) @@ -151,7 +152,7 @@ async def _fetch_listings_with_semaphore( # we do 10 queries each with an increment in price range so we send more queries but each # has a smaller chance of returning more than 1.5k results - number_of_steps = 1 + number_of_steps = 10 price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): From d1cef99c5a34d8d14ddc2bbf052bb660e1910a1d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 20:09:41 +0000 Subject: [PATCH 09/10] make task processing a bit better. still doing 1 query to check if needs processing; will fix later --- crawler/listing_processor.py | 17 +++++++++++++++++ crawler/tasks/listing_tasks.py | 31 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py index 6955050..91943b3 100644 --- a/crawler/listing_processor.py +++ b/crawler/listing_processor.py @@ -54,6 +54,14 @@ class Step: class FetchListingDetailsStep(Step): + async def needs_processing(self, listing_id: int) -> bool: + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if (existing_listings) == 0: + return True + return False + async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching details for {listing_id=}") existing_listings = await self.listing_repository.get_listings( @@ -115,6 +123,15 @@ class FetchListingDetailsStep(Step): class FetchImagesStep(Step): + async def needs_processing(self, listing_id: int) -> bool: + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if len(existing_listings) == 0: + return False # if listing doesn't exist, we can't process it + listing = existing_listings[0] + return len(listing.floorplan_image_paths) == 0 + async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching images for {listing_id=}") existing_listings = await self.listing_repository.get_listings( diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index ecf3626..3738d32 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -39,13 +39,15 @@ async def dump_listings_full( repository = ListingRepository(engine) task.update_state(state="Identifying missing listings", meta={"progress": 0}) - missing_ids = await get_missing_listing_ids(parameters, repository) - logger.info(f"Found {len(missing_ids)} missing listings") + ids_to_process = await get_ids_to_process( + parameters=parameters, repository=repository, task=task + ) + logger.info(f"Found {len(ids_to_process)} listings to process") listing_processor = ListingProcessor(repository) - logger.info(f"Starting processing {len(missing_ids)} new listings") + logger.info(f"Starting processing {len(ids_to_process)} listings") return await dump_listings_and_monitor( - task=task, listing_processor=listing_processor, missing_ids=missing_ids + task=task, listing_processor=listing_processor, missing_ids=ids_to_process ) @@ -63,7 +65,7 @@ async def dump_listings_and_monitor( while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) logger.error( - f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" + f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})" ) task.update_state( state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", @@ -96,15 +98,20 @@ def setup_periodic_tasks(sender, **kwargs): ) -async def get_missing_listing_ids( +async def get_ids_to_process( + *, parameters: QueryParameters, repository: ListingRepository, + task: Task, ) -> set[int]: semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections districts = await get_valid_districts_to_scrape(parameters.district_names) + task.update_state(state="Fetching listings to scrape", meta={"progress": 0}) json_responses: list[list[dict[str, Any]]] = await asyncio.gather( *[ - _fetch_listings_with_semaphore(semaphore, parameters, district) + _fetch_listings_with_semaphore( + task=task, semaphore=semaphore, parameters=parameters, district=district + ) for district in districts.keys() ], ) @@ -123,8 +130,8 @@ async def get_missing_listing_ids( # if listing is already in db, do not fetch details again all_listing_ids = {l.id for l in await repository.get_listings()} - missing_ids = all_listing_ids - identifiers - return missing_ids + all_ids = all_listing_ids.union(identifiers) + return all_ids async def get_valid_districts_to_scrape( @@ -142,6 +149,8 @@ async def get_valid_districts_to_scrape( async def _fetch_listings_with_semaphore( + *, + task: Task, semaphore: asyncio.Semaphore, parameters: QueryParameters, district: str, @@ -156,6 +165,10 @@ async def _fetch_listings_with_semaphore( price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): + task.update_state( + state=f"Fetching listings ({step} out of {number_of_steps})", + meta={"progress": step / number_of_steps}, + ) min_price = step * price_step max_price = (step + 1) * price_step logger.debug( From e5c68f6bb72c5aa95a37b5c30053aa0dc03a17c7 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 2 Aug 2025 17:25:56 +0000 Subject: [PATCH 10/10] add opentelemetry --- crawler/api/metrics.py | 17 +++ crawler/poetry.lock | 312 ++++++++++++++++++++++++++++++++++++++++- crawler/pyproject.toml | 5 + 3 files changed, 333 insertions(+), 1 deletion(-) create mode 100644 crawler/api/metrics.py diff --git a/crawler/api/metrics.py b/crawler/api/metrics.py new file mode 100644 index 0000000..1baae9e --- /dev/null +++ b/crawler/api/metrics.py @@ -0,0 +1,17 @@ +# metrics.py +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from prometheus_client import make_asgi_app + +# Set up Prometheus reader and meter provider +reader = PrometheusMetricReader() +provider = MeterProvider( + metric_readers=[reader], + resource=Resource.create({SERVICE_NAME: "fastapi-metrics-app"}), +) +set_meter_provider(provider) + +# Expose the Prometheus metrics endpoint +metrics_app = make_asgi_app() # Exposes /metrics diff --git a/crawler/poetry.lock b/crawler/poetry.lock index ef1119a..36831aa 100644 --- a/crawler/poetry.lock +++ b/crawler/poetry.lock @@ -310,6 +310,21 @@ types-python-dateutil = ">=2.8.10" doc = ["doc8", "sphinx (>=7.0.0)", "sphinx-autobuild", "sphinx-autodoc-typehints", "sphinx_rtd_theme (>=1.3.0)"] test = ["dateparser (==1.*)", "pre-commit", "pytest", "pytest-cov", "pytest-mock", "pytz (==2021.1)", "simplejson (==3.*)"] +[[package]] +name = "asgiref" +version = "3.9.1" +description = "ASGI specs, helper code, and adapters" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "asgiref-3.9.1-py3-none-any.whl", hash = "sha256:f3bba7092a48005b5f5bacd747d36ee4a5a61f4a269a6df590b43144355ebd2c"}, + {file = "asgiref-3.9.1.tar.gz", hash = "sha256:a5ab6582236218e5ef1648f242fd9f10626cfd4de8dc377db215d5d5098e3142"}, +] + +[package.extras] +tests = ["mypy (>=1.14.0)", "pytest", "pytest-asyncio"] + [[package]] name = "asttokens" version = "3.0.0" @@ -1699,6 +1714,30 @@ files = [ [package.extras] all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] +[[package]] +name = "importlib-metadata" +version = "8.7.0" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd"}, + {file = "importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000"}, +] + +[package.dependencies] +zipp = ">=3.20" + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +perf = ["ipython"] +test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +type = ["pytest-mypy"] + [[package]] name = "ipdb" version = "0.13.13" @@ -2828,6 +2867,168 @@ numpy = [ {version = ">=1.23.5", markers = "python_version == \"3.11\""}, ] +[[package]] +name = "opentelemetry-api" +version = "1.36.0" +description = "OpenTelemetry Python API" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_api-1.36.0-py3-none-any.whl", hash = "sha256:02f20bcacf666e1333b6b1f04e647dc1d5111f86b8e510238fcc56d7762cda8c"}, + {file = "opentelemetry_api-1.36.0.tar.gz", hash = "sha256:9a72572b9c416d004d492cbc6e61962c0501eaf945ece9b5a0f56597d8348aa0"}, +] + +[package.dependencies] +importlib-metadata = ">=6.0,<8.8.0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-exporter-prometheus" +version = "0.57b0" +description = "Prometheus Metric Exporter for OpenTelemetry" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_exporter_prometheus-0.57b0-py3-none-any.whl", hash = "sha256:c5b893d1cdd593fb022af2c7de3258c2d5a4d04402ae80d9fa35675fed77f05c"}, + {file = "opentelemetry_exporter_prometheus-0.57b0.tar.gz", hash = "sha256:9eb15bdc189235cf03c3f93abf56f8ff0ab57a493a189263bd7fe77a4249e689"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-sdk = ">=1.36.0,<1.37.0" +prometheus-client = ">=0.5.0,<1.0.0" + +[[package]] +name = "opentelemetry-instrumentation" +version = "0.57b0" +description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation-0.57b0-py3-none-any.whl", hash = "sha256:9109280f44882e07cec2850db28210b90600ae9110b42824d196de357cbddf7e"}, + {file = "opentelemetry_instrumentation-0.57b0.tar.gz", hash = "sha256:f2a30135ba77cdea2b0e1df272f4163c154e978f57214795d72f40befd4fcf05"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.4,<2.0" +opentelemetry-semantic-conventions = "0.57b0" +packaging = ">=18.0" +wrapt = ">=1.0.0,<2.0.0" + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.57b0" +description = "ASGI instrumentation for OpenTelemetry" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation_asgi-0.57b0-py3-none-any.whl", hash = "sha256:47debbde6af066a7e8e911f7193730d5e40d62effc1ac2e1119908347790a3ea"}, + {file = "opentelemetry_instrumentation_asgi-0.57b0.tar.gz", hash = "sha256:a6f880b5d1838f65688fc992c65fbb1d3571f319d370990c32e759d3160e510b"}, +] + +[package.dependencies] +asgiref = ">=3.0,<4.0" +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.57b0" +opentelemetry-semantic-conventions = "0.57b0" +opentelemetry-util-http = "0.57b0" + +[package.extras] +instruments = ["asgiref (>=3.0,<4.0)"] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.57b0" +description = "OpenTelemetry FastAPI Instrumentation" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation_fastapi-0.57b0-py3-none-any.whl", hash = "sha256:61e6402749ffe0bfec582e58155e0d81dd38723cd9bc4562bca1acca80334006"}, + {file = "opentelemetry_instrumentation_fastapi-0.57b0.tar.gz", hash = "sha256:73ac22f3c472a8f9cb21d1fbe5a4bf2797690c295fff4a1c040e9b1b1688a105"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.57b0" +opentelemetry-instrumentation-asgi = "0.57b0" +opentelemetry-semantic-conventions = "0.57b0" +opentelemetry-util-http = "0.57b0" + +[package.extras] +instruments = ["fastapi (>=0.92,<1.0)"] + +[[package]] +name = "opentelemetry-instrumentation-sqlalchemy" +version = "0.57b0" +description = "OpenTelemetry SQLAlchemy instrumentation" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation_sqlalchemy-0.57b0-py3-none-any.whl", hash = "sha256:8a1a815331cb04fc95aa7c50e9c681cdccfb12e1fa4522f079fe4b24753ae106"}, + {file = "opentelemetry_instrumentation_sqlalchemy-0.57b0.tar.gz", hash = "sha256:95667326b7cc22bb4bc9941f98ca22dd177679f9a4d277646cc21074c0d732ff"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.57b0" +opentelemetry-semantic-conventions = "0.57b0" +packaging = ">=21.0" +wrapt = ">=1.11.2" + +[package.extras] +instruments = ["sqlalchemy (>=1.0.0,<2.1.0)"] + +[[package]] +name = "opentelemetry-sdk" +version = "1.36.0" +description = "OpenTelemetry Python SDK" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_sdk-1.36.0-py3-none-any.whl", hash = "sha256:19fe048b42e98c5c1ffe85b569b7073576ad4ce0bcb6e9b4c6a39e890a6c45fb"}, + {file = "opentelemetry_sdk-1.36.0.tar.gz", hash = "sha256:19c8c81599f51b71670661ff7495c905d8fdf6976e41622d5245b791b06fa581"}, +] + +[package.dependencies] +opentelemetry-api = "1.36.0" +opentelemetry-semantic-conventions = "0.57b0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.57b0" +description = "OpenTelemetry Semantic Conventions" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_semantic_conventions-0.57b0-py3-none-any.whl", hash = "sha256:757f7e76293294f124c827e514c2a3144f191ef175b069ce8d1211e1e38e9e78"}, + {file = "opentelemetry_semantic_conventions-0.57b0.tar.gz", hash = "sha256:609a4a79c7891b4620d64c7aac6898f872d790d75f22019913a660756f27ff32"}, +] + +[package.dependencies] +opentelemetry-api = "1.36.0" +typing-extensions = ">=4.5.0" + +[[package]] +name = "opentelemetry-util-http" +version = "0.57b0" +description = "Web util for OpenTelemetry" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "opentelemetry_util_http-0.57b0-py3-none-any.whl", hash = "sha256:e54c0df5543951e471c3d694f85474977cd5765a3b7654398c83bab3d2ffb8e9"}, + {file = "opentelemetry_util_http-0.57b0.tar.gz", hash = "sha256:f7417595ead0eb42ed1863ec9b2f839fc740368cd7bbbfc1d0a47bc1ab0aba11"}, +] + [[package]] name = "overrides" version = "7.7.0" @@ -5122,6 +5323,95 @@ files = [ {file = "websockets-15.0.1.tar.gz", hash = "sha256:82544de02076bafba038ce055ee6412d68da13ab47f0c60cab827346de828dee"}, ] +[[package]] +name = "wrapt" +version = "1.17.2" +description = "Module for decorators, wrappers and monkey patching." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "wrapt-1.17.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3d57c572081fed831ad2d26fd430d565b76aa277ed1d30ff4d40670b1c0dd984"}, + {file = "wrapt-1.17.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b5e251054542ae57ac7f3fba5d10bfff615b6c2fb09abeb37d2f1463f841ae22"}, + {file = "wrapt-1.17.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:80dd7db6a7cb57ffbc279c4394246414ec99537ae81ffd702443335a61dbf3a7"}, + {file = "wrapt-1.17.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a6e821770cf99cc586d33833b2ff32faebdbe886bd6322395606cf55153246c"}, + {file = "wrapt-1.17.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b60fb58b90c6d63779cb0c0c54eeb38941bae3ecf7a73c764c52c88c2dcb9d72"}, + {file = "wrapt-1.17.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b870b5df5b71d8c3359d21be8f0d6c485fa0ebdb6477dda51a1ea54a9b558061"}, + {file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:4011d137b9955791f9084749cba9a367c68d50ab8d11d64c50ba1688c9b457f2"}, + {file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:1473400e5b2733e58b396a04eb7f35f541e1fb976d0c0724d0223dd607e0f74c"}, + {file = "wrapt-1.17.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:3cedbfa9c940fdad3e6e941db7138e26ce8aad38ab5fe9dcfadfed9db7a54e62"}, + {file = "wrapt-1.17.2-cp310-cp310-win32.whl", hash = "sha256:582530701bff1dec6779efa00c516496968edd851fba224fbd86e46cc6b73563"}, + {file = "wrapt-1.17.2-cp310-cp310-win_amd64.whl", hash = "sha256:58705da316756681ad3c9c73fd15499aa4d8c69f9fd38dc8a35e06c12468582f"}, + {file = "wrapt-1.17.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:ff04ef6eec3eee8a5efef2401495967a916feaa353643defcc03fc74fe213b58"}, + {file = "wrapt-1.17.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4db983e7bca53819efdbd64590ee96c9213894272c776966ca6306b73e4affda"}, + {file = "wrapt-1.17.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9abc77a4ce4c6f2a3168ff34b1da9b0f311a8f1cfd694ec96b0603dff1c79438"}, + {file = "wrapt-1.17.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0b929ac182f5ace000d459c59c2c9c33047e20e935f8e39371fa6e3b85d56f4a"}, + {file = "wrapt-1.17.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f09b286faeff3c750a879d336fb6d8713206fc97af3adc14def0cdd349df6000"}, + {file = "wrapt-1.17.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1a7ed2d9d039bd41e889f6fb9364554052ca21ce823580f6a07c4ec245c1f5d6"}, + {file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:129a150f5c445165ff941fc02ee27df65940fcb8a22a61828b1853c98763a64b"}, + {file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1fb5699e4464afe5c7e65fa51d4f99e0b2eadcc176e4aa33600a3df7801d6662"}, + {file = "wrapt-1.17.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:9a2bce789a5ea90e51a02dfcc39e31b7f1e662bc3317979aa7e5538e3a034f72"}, + {file = "wrapt-1.17.2-cp311-cp311-win32.whl", hash = "sha256:4afd5814270fdf6380616b321fd31435a462019d834f83c8611a0ce7484c7317"}, + {file = "wrapt-1.17.2-cp311-cp311-win_amd64.whl", hash = "sha256:acc130bc0375999da18e3d19e5a86403667ac0c4042a094fefb7eec8ebac7cf3"}, + {file = "wrapt-1.17.2-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:d5e2439eecc762cd85e7bd37161d4714aa03a33c5ba884e26c81559817ca0925"}, + {file = "wrapt-1.17.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:3fc7cb4c1c744f8c05cd5f9438a3caa6ab94ce8344e952d7c45a8ed59dd88392"}, + {file = "wrapt-1.17.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8fdbdb757d5390f7c675e558fd3186d590973244fab0c5fe63d373ade3e99d40"}, + {file = "wrapt-1.17.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5bb1d0dbf99411f3d871deb6faa9aabb9d4e744d67dcaaa05399af89d847a91d"}, + {file = "wrapt-1.17.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d18a4865f46b8579d44e4fe1e2bcbc6472ad83d98e22a26c963d46e4c125ef0b"}, + {file = "wrapt-1.17.2-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc570b5f14a79734437cb7b0500376b6b791153314986074486e0b0fa8d71d98"}, + {file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6d9187b01bebc3875bac9b087948a2bccefe464a7d8f627cf6e48b1bbae30f82"}, + {file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:9e8659775f1adf02eb1e6f109751268e493c73716ca5761f8acb695e52a756ae"}, + {file = "wrapt-1.17.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e8b2816ebef96d83657b56306152a93909a83f23994f4b30ad4573b00bd11bb9"}, + {file = "wrapt-1.17.2-cp312-cp312-win32.whl", hash = "sha256:468090021f391fe0056ad3e807e3d9034e0fd01adcd3bdfba977b6fdf4213ea9"}, + {file = "wrapt-1.17.2-cp312-cp312-win_amd64.whl", hash = "sha256:ec89ed91f2fa8e3f52ae53cd3cf640d6feff92ba90d62236a81e4e563ac0e991"}, + {file = "wrapt-1.17.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:6ed6ffac43aecfe6d86ec5b74b06a5be33d5bb9243d055141e8cabb12aa08125"}, + {file = "wrapt-1.17.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:35621ae4c00e056adb0009f8e86e28eb4a41a4bfa8f9bfa9fca7d343fe94f998"}, + {file = "wrapt-1.17.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a604bf7a053f8362d27eb9fefd2097f82600b856d5abe996d623babd067b1ab5"}, + {file = "wrapt-1.17.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5cbabee4f083b6b4cd282f5b817a867cf0b1028c54d445b7ec7cfe6505057cf8"}, + {file = "wrapt-1.17.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:49703ce2ddc220df165bd2962f8e03b84c89fee2d65e1c24a7defff6f988f4d6"}, + {file = "wrapt-1.17.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8112e52c5822fc4253f3901b676c55ddf288614dc7011634e2719718eaa187dc"}, + {file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:9fee687dce376205d9a494e9c121e27183b2a3df18037f89d69bd7b35bcf59e2"}, + {file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:18983c537e04d11cf027fbb60a1e8dfd5190e2b60cc27bc0808e653e7b218d1b"}, + {file = "wrapt-1.17.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:703919b1633412ab54bcf920ab388735832fdcb9f9a00ae49387f0fe67dad504"}, + {file = "wrapt-1.17.2-cp313-cp313-win32.whl", hash = "sha256:abbb9e76177c35d4e8568e58650aa6926040d6a9f6f03435b7a522bf1c487f9a"}, + {file = "wrapt-1.17.2-cp313-cp313-win_amd64.whl", hash = "sha256:69606d7bb691b50a4240ce6b22ebb319c1cfb164e5f6569835058196e0f3a845"}, + {file = "wrapt-1.17.2-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:4a721d3c943dae44f8e243b380cb645a709ba5bd35d3ad27bc2ed947e9c68192"}, + {file = "wrapt-1.17.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:766d8bbefcb9e00c3ac3b000d9acc51f1b399513f44d77dfe0eb026ad7c9a19b"}, + {file = "wrapt-1.17.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e496a8ce2c256da1eb98bd15803a79bee00fc351f5dfb9ea82594a3f058309e0"}, + {file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:40d615e4fe22f4ad3528448c193b218e077656ca9ccb22ce2cb20db730f8d306"}, + {file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a5aaeff38654462bc4b09023918b7f21790efb807f54c000a39d41d69cf552cb"}, + {file = "wrapt-1.17.2-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9a7d15bbd2bc99e92e39f49a04653062ee6085c0e18b3b7512a4f2fe91f2d681"}, + {file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:e3890b508a23299083e065f435a492b5435eba6e304a7114d2f919d400888cc6"}, + {file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:8c8b293cd65ad716d13d8dd3624e42e5a19cc2a2f1acc74b30c2c13f15cb61a6"}, + {file = "wrapt-1.17.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:4c82b8785d98cdd9fed4cac84d765d234ed3251bd6afe34cb7ac523cb93e8b4f"}, + {file = "wrapt-1.17.2-cp313-cp313t-win32.whl", hash = "sha256:13e6afb7fe71fe7485a4550a8844cc9ffbe263c0f1a1eea569bc7091d4898555"}, + {file = "wrapt-1.17.2-cp313-cp313t-win_amd64.whl", hash = "sha256:eaf675418ed6b3b31c7a989fd007fa7c3be66ce14e5c3b27336383604c9da85c"}, + {file = "wrapt-1.17.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:5c803c401ea1c1c18de70a06a6f79fcc9c5acfc79133e9869e730ad7f8ad8ef9"}, + {file = "wrapt-1.17.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f917c1180fdb8623c2b75a99192f4025e412597c50b2ac870f156de8fb101119"}, + {file = "wrapt-1.17.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:ecc840861360ba9d176d413a5489b9a0aff6d6303d7e733e2c4623cfa26904a6"}, + {file = "wrapt-1.17.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bb87745b2e6dc56361bfde481d5a378dc314b252a98d7dd19a651a3fa58f24a9"}, + {file = "wrapt-1.17.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:58455b79ec2661c3600e65c0a716955adc2410f7383755d537584b0de41b1d8a"}, + {file = "wrapt-1.17.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b4e42a40a5e164cbfdb7b386c966a588b1047558a990981ace551ed7e12ca9c2"}, + {file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:91bd7d1773e64019f9288b7a5101f3ae50d3d8e6b1de7edee9c2ccc1d32f0c0a"}, + {file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:bb90fb8bda722a1b9d48ac1e6c38f923ea757b3baf8ebd0c82e09c5c1a0e7a04"}, + {file = "wrapt-1.17.2-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:08e7ce672e35efa54c5024936e559469436f8b8096253404faeb54d2a878416f"}, + {file = "wrapt-1.17.2-cp38-cp38-win32.whl", hash = "sha256:410a92fefd2e0e10d26210e1dfb4a876ddaf8439ef60d6434f21ef8d87efc5b7"}, + {file = "wrapt-1.17.2-cp38-cp38-win_amd64.whl", hash = "sha256:95c658736ec15602da0ed73f312d410117723914a5c91a14ee4cdd72f1d790b3"}, + {file = "wrapt-1.17.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:99039fa9e6306880572915728d7f6c24a86ec57b0a83f6b2491e1d8ab0235b9a"}, + {file = "wrapt-1.17.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2696993ee1eebd20b8e4ee4356483c4cb696066ddc24bd70bcbb80fa56ff9061"}, + {file = "wrapt-1.17.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:612dff5db80beef9e649c6d803a8d50c409082f1fedc9dbcdfde2983b2025b82"}, + {file = "wrapt-1.17.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62c2caa1585c82b3f7a7ab56afef7b3602021d6da34fbc1cf234ff139fed3cd9"}, + {file = "wrapt-1.17.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c958bcfd59bacc2d0249dcfe575e71da54f9dcf4a8bdf89c4cb9a68a1170d73f"}, + {file = "wrapt-1.17.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fc78a84e2dfbc27afe4b2bd7c80c8db9bca75cc5b85df52bfe634596a1da846b"}, + {file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:ba0f0eb61ef00ea10e00eb53a9129501f52385c44853dbd6c4ad3f403603083f"}, + {file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:1e1fe0e6ab7775fd842bc39e86f6dcfc4507ab0ffe206093e76d61cde37225c8"}, + {file = "wrapt-1.17.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c86563182421896d73858e08e1db93afdd2b947a70064b813d515d66549e15f9"}, + {file = "wrapt-1.17.2-cp39-cp39-win32.whl", hash = "sha256:f393cda562f79828f38a819f4788641ac7c4085f30f1ce1a68672baa686482bb"}, + {file = "wrapt-1.17.2-cp39-cp39-win_amd64.whl", hash = "sha256:36ccae62f64235cf8ddb682073a60519426fdd4725524ae38874adf72b5f2aeb"}, + {file = "wrapt-1.17.2-py3-none-any.whl", hash = "sha256:b18f2d1533a71f069c7f82d524a52599053d4c7166e9dd374ae2136b7f40f7c8"}, + {file = "wrapt-1.17.2.tar.gz", hash = "sha256:41388e9d4d1522446fe79d3213196bd9e3b301a336965b9e27ca2788ebd122f3"}, +] + [[package]] name = "yarl" version = "1.20.1" @@ -5241,7 +5531,27 @@ idna = ">=2.0" multidict = ">=4.0" propcache = ">=0.2.1" +[[package]] +name = "zipp" +version = "3.23.0" +description = "Backport of pathlib-compatible object wrapper for zip files" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"}, + {file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +type = ["pytest-mypy"] + [metadata] lock-version = "2.1" python-versions = ">3.11" -content-hash = "110fae166c8a5b2f9c178f822097ee5f939d22f04445bab6ca945612e08a4ed8" +content-hash = "9356591426eaabc61359774a41a9f508f597fe844a023fb3b2291ba779cb4674" diff --git a/crawler/pyproject.toml b/crawler/pyproject.toml index 92e1a7f..0cd0ffe 100644 --- a/crawler/pyproject.toml +++ b/crawler/pyproject.toml @@ -33,6 +33,11 @@ celery = "^5.5.3" redis = "^6.2.0" watchdog = "^6.0.0" apprise = "^1.9.3" +opentelemetry-api = "^1.36.0" +opentelemetry-sdk = "^1.36.0" +opentelemetry-exporter-prometheus = "^0.57b0" +opentelemetry-instrumentation-fastapi = "^0.57b0" +opentelemetry-instrumentation-sqlalchemy = "^0.57b0" [tool.poetry.group.dev.dependencies] ipdb = "^0.13.13"