Structured logging via JsonFormatter replaces uvicorn's default format so Loki can parse timestamps and fields. 14 business metrics (scrape stats, throttle events, circuit breaker state, cache hit rate, OCR success rate, Celery task lifecycle) are defined in a shared metrics module and instrumented across the scraper pipeline, API, and workers. Celery workers expose a Prometheus HTTP endpoint on configurable ports.
619 lines
22 KiB
Python
619 lines
22 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
from celery import Task
|
|
from celery.schedules import crontab
|
|
from celery_app import app
|
|
from config.schedule_config import SchedulesConfig
|
|
from config.scraper_config import ScraperConfig
|
|
from listing_processor import ListingProcessor
|
|
from models.listing import Listing, QueryParameters
|
|
from rec.query import create_session, listing_query
|
|
from rec.exceptions import CircuitBreakerOpenError, InvalidResponseError, ThrottlingError
|
|
from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics
|
|
from repositories.listing_repository import ListingRepository
|
|
from database import engine
|
|
from services.query_splitter import QuerySplitter, SubQuery
|
|
from utils.redis_lock import redis_lock
|
|
from services.listing_cache import invalidate_cache
|
|
from services.task_progress_publisher import publish_task_progress
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
# Central logging is now configured in celery_app.py via logging_config
|
|
celery_logger = logging.getLogger("celery.task")
|
|
|
|
SCRAPE_LOCK_NAME = "scrape_listings"
|
|
LOG_BUFFER_MAX_LINES = 200
|
|
|
|
# Number of concurrent consumer workers that process listings from the queue.
|
|
NUM_WORKERS = 20
|
|
|
|
# Phase constants for task state reporting
|
|
PHASE_SPLITTING = "splitting"
|
|
PHASE_FETCHING = "fetching"
|
|
PHASE_PROCESSING = "processing"
|
|
PHASE_COMPLETED = "completed"
|
|
|
|
# Module-level log buffer — active only during task execution.
|
|
# This is safe as module-level mutable state because Celery workers use a
|
|
# prefork pool: each worker process handles one task at a time, so there is
|
|
# no concurrent access within a single process. The TaskLogHandler appends
|
|
# here; _update_task_state reads from here.
|
|
_active_log_buffer: deque[str] | None = None
|
|
|
|
|
|
@dataclass
|
|
class _PipelineState:
|
|
"""Shared mutable state for the streaming fetch-and-process pipeline."""
|
|
ids_collected: int = 0
|
|
completed_subqueries: int = 0
|
|
total_pages_fetched: int = 0
|
|
fetching_done: bool = False
|
|
processed_count: int = 0
|
|
failed_count: int = 0
|
|
details_fetched: int = 0
|
|
images_downloaded: int = 0
|
|
ocr_completed: int = 0
|
|
processed_listings: list[Listing] = field(default_factory=list)
|
|
|
|
|
|
class ProgressReporter:
|
|
"""Event-driven, throttled progress reporter.
|
|
|
|
Call ``notify()`` whenever pipeline state changes (page fetched, item
|
|
processed, phase change). Publishes are throttled to at most one every
|
|
``min_interval`` seconds to avoid flooding Redis / WebSocket. Use
|
|
``force=True`` for phase changes, completion, and errors so they are
|
|
published immediately.
|
|
|
|
``run_background_flush`` is a long-running coroutine that publishes every
|
|
``background_interval`` seconds during quiet periods (keeps ETA / elapsed
|
|
fields current even when no items are being processed).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
task: Task,
|
|
state: _PipelineState,
|
|
subqueries_total: int,
|
|
start_time: float,
|
|
min_interval: float = 0.25,
|
|
background_interval: float = 2.0,
|
|
) -> None:
|
|
self._task = task
|
|
self._state = state
|
|
self._subqueries_total = subqueries_total
|
|
self._start_time = start_time
|
|
self._min_interval = min_interval
|
|
self._background_interval = background_interval
|
|
self._last_publish: float = 0.0
|
|
self._last_log_progress: float = 0.0
|
|
self._dirty = False
|
|
self._deferred: asyncio.TimerHandle | None = None
|
|
self._loop: asyncio.AbstractEventLoop | None = None
|
|
self._stopped = False
|
|
|
|
# -- internal helpers ---------------------------------------------------
|
|
|
|
def _build_meta(self) -> tuple[str, dict[str, Any]]:
|
|
state = self._state
|
|
total = state.ids_collected
|
|
done = state.processed_count + state.failed_count
|
|
|
|
phase = PHASE_PROCESSING if state.fetching_done else PHASE_FETCHING
|
|
progress_ratio = round(done / total, 2) if total > 0 else 0.0
|
|
|
|
elapsed = time.time() - self._start_time
|
|
rate = done / elapsed if elapsed > 0 else 0
|
|
remaining = (total - done) if total > 0 else 0
|
|
eta = remaining / rate if rate > 0 else 0
|
|
|
|
label = (
|
|
f"{'Processing' if state.fetching_done else 'Fetching & processing'}: "
|
|
f"{done}/{total}"
|
|
)
|
|
meta: dict[str, Any] = {
|
|
"phase": phase,
|
|
"progress": progress_ratio,
|
|
"processed": done,
|
|
"total": total,
|
|
"subqueries_completed": state.completed_subqueries,
|
|
"subqueries_total": self._subqueries_total,
|
|
"ids_collected": state.ids_collected,
|
|
"pages_fetched": state.total_pages_fetched,
|
|
"fetching_done": state.fetching_done,
|
|
"details_fetched": state.details_fetched,
|
|
"images_downloaded": state.images_downloaded,
|
|
"ocr_completed": state.ocr_completed,
|
|
"failed": state.failed_count,
|
|
"elapsed_seconds": round(elapsed, 1),
|
|
"rate_per_second": round(rate, 2),
|
|
"eta_seconds": round(eta, 1),
|
|
}
|
|
return label, meta
|
|
|
|
def _publish(self) -> None:
|
|
label, meta = self._build_meta()
|
|
|
|
# Log milestones at every 10% progress
|
|
progress_ratio = meta["progress"]
|
|
if progress_ratio >= self._last_log_progress + 0.1 or meta["processed"] == 1:
|
|
done = meta["processed"]
|
|
total = meta["total"]
|
|
elapsed = meta["elapsed_seconds"]
|
|
rate = meta["rate_per_second"]
|
|
eta = meta["eta_seconds"]
|
|
celery_logger.info(
|
|
f"Progress: {progress_ratio * 100:.0f}% "
|
|
f"({done}/{total}) "
|
|
f"| Elapsed: {elapsed:.0f}s "
|
|
f"| Rate: {rate:.1f}/s "
|
|
f"| ETA: {eta:.0f}s"
|
|
)
|
|
self._last_log_progress = progress_ratio
|
|
|
|
_update_task_state(self._task, label, meta)
|
|
self._last_publish = time.time()
|
|
self._dirty = False
|
|
|
|
def _cancel_deferred(self) -> None:
|
|
if self._deferred is not None:
|
|
self._deferred.cancel()
|
|
self._deferred = None
|
|
|
|
def _deferred_callback(self) -> None:
|
|
"""Called by asyncio.call_later when the throttle window expires."""
|
|
self._deferred = None
|
|
if self._dirty and not self._stopped:
|
|
self._publish()
|
|
|
|
# -- public API ---------------------------------------------------------
|
|
|
|
def notify(self, force: bool = False) -> None:
|
|
"""Signal that pipeline state has changed.
|
|
|
|
If ``force`` is True the publish happens immediately (use for phase
|
|
changes and completion). Otherwise the publish is throttled.
|
|
"""
|
|
if self._stopped:
|
|
return
|
|
|
|
self._dirty = True
|
|
now = time.time()
|
|
|
|
if force or (now - self._last_publish) >= self._min_interval:
|
|
self._cancel_deferred()
|
|
self._publish()
|
|
elif self._deferred is None:
|
|
# Schedule a deferred flush so trailing updates aren't lost
|
|
loop = self._loop or asyncio.get_event_loop()
|
|
delay = self._min_interval - (now - self._last_publish)
|
|
self._deferred = loop.call_later(delay, self._deferred_callback)
|
|
|
|
def flush(self) -> None:
|
|
"""Force a final publish (call at pipeline end)."""
|
|
self._cancel_deferred()
|
|
self._publish()
|
|
|
|
async def run_background_flush(self) -> None:
|
|
"""Background keepalive: publish every ``background_interval`` seconds.
|
|
|
|
This keeps ETA / elapsed fields current during quiet periods.
|
|
Exits when the pipeline is done (fetching complete and all items
|
|
processed).
|
|
"""
|
|
self._loop = asyncio.get_running_loop()
|
|
while True:
|
|
state = self._state
|
|
total = state.ids_collected
|
|
done = state.processed_count + state.failed_count
|
|
|
|
if state.fetching_done and done >= total and total > 0:
|
|
break
|
|
if state.fetching_done and total == 0:
|
|
break
|
|
|
|
await asyncio.sleep(self._background_interval)
|
|
|
|
# Publish if nothing else has published recently
|
|
if not self._stopped:
|
|
now = time.time()
|
|
if (now - self._last_publish) >= self._background_interval:
|
|
self._publish()
|
|
|
|
self._stopped = True
|
|
self._cancel_deferred()
|
|
|
|
|
|
class TaskLogHandler(logging.Handler):
|
|
"""Captures log records into a deque for inclusion in task state updates."""
|
|
|
|
def __init__(self, buffer: deque[str]) -> None:
|
|
super().__init__()
|
|
self.buffer = buffer
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
try:
|
|
self.buffer.append(self.format(record))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _update_task_state(task: Task, state: str, meta: dict[str, Any]) -> None:
|
|
"""Call task.update_state with logs injected from the active log buffer."""
|
|
if _active_log_buffer is not None:
|
|
meta["logs"] = list(_active_log_buffer)
|
|
task.update_state(state=state, meta=meta)
|
|
if hasattr(task, 'request') and task.request.id:
|
|
publish_task_progress(task.request.id, state, meta)
|
|
|
|
|
|
async def _fetch_subquery(
|
|
sq: SubQuery,
|
|
parameters: QueryParameters,
|
|
session: object,
|
|
config: ScraperConfig,
|
|
semaphore: asyncio.Semaphore,
|
|
existing_ids: set[int],
|
|
queue: asyncio.Queue[int | None],
|
|
state: _PipelineState,
|
|
reporter: ProgressReporter,
|
|
) -> None:
|
|
"""Fetch pages for a single subquery and enqueue new listing IDs."""
|
|
estimated = sq.estimated_results or 0
|
|
if estimated == 0:
|
|
state.completed_subqueries += 1
|
|
reporter.notify()
|
|
return
|
|
|
|
page_size = parameters.page_size
|
|
max_pages = min(
|
|
config.max_pages_per_query,
|
|
(estimated // page_size) + 1,
|
|
)
|
|
|
|
for page_id in range(1, max_pages + 1):
|
|
async with semaphore:
|
|
await asyncio.sleep(config.request_delay_ms / 1000)
|
|
try:
|
|
result = await listing_query(
|
|
page=page_id,
|
|
channel=parameters.listing_type,
|
|
min_bedrooms=sq.min_bedrooms,
|
|
max_bedrooms=sq.max_bedrooms,
|
|
radius=parameters.radius,
|
|
min_price=sq.min_price,
|
|
max_price=sq.max_price,
|
|
district=sq.district,
|
|
page_size=page_size,
|
|
max_days_since_added=parameters.max_days_since_added,
|
|
furnish_types=parameters.furnish_types or [],
|
|
session=session,
|
|
config=config,
|
|
)
|
|
state.total_pages_fetched += 1
|
|
reporter.notify()
|
|
|
|
properties = result.get("properties", [])
|
|
for prop in properties:
|
|
identifier = prop.get("identifier")
|
|
if identifier and identifier not in existing_ids:
|
|
existing_ids.add(identifier)
|
|
state.ids_collected += 1
|
|
await queue.put(identifier)
|
|
|
|
if len(properties) < page_size:
|
|
break
|
|
|
|
except CircuitBreakerOpenError as e:
|
|
celery_logger.error(f"Circuit breaker open: {e}")
|
|
break
|
|
except ThrottlingError as e:
|
|
celery_logger.warning(
|
|
f"Throttling on {sq.district} page {page_id}: {e}"
|
|
)
|
|
break
|
|
except InvalidResponseError:
|
|
celery_logger.debug(
|
|
f"Max page for {sq.district}: {page_id - 1}"
|
|
)
|
|
break
|
|
except Exception as e:
|
|
celery_logger.warning(
|
|
f"Error fetching page {page_id} for "
|
|
f"{sq.district}: {e}"
|
|
)
|
|
break
|
|
|
|
state.completed_subqueries += 1
|
|
reporter.notify()
|
|
|
|
|
|
async def _process_worker(
|
|
queue: asyncio.Queue[int | None],
|
|
processor: ListingProcessor,
|
|
state: _PipelineState,
|
|
reporter: ProgressReporter,
|
|
) -> None:
|
|
"""Consumer worker: pull listing IDs from the queue and process them."""
|
|
while True:
|
|
listing_id = await queue.get()
|
|
if listing_id is None:
|
|
break
|
|
|
|
def step_callback(step_name: str) -> None:
|
|
if step_name == "details":
|
|
state.details_fetched += 1
|
|
elif step_name == "images":
|
|
state.images_downloaded += 1
|
|
elif step_name == "ocr":
|
|
state.ocr_completed += 1
|
|
|
|
listing = await processor.process_listing(
|
|
listing_id, on_step_complete=step_callback
|
|
)
|
|
if listing is not None:
|
|
state.processed_count += 1
|
|
state.processed_listings.append(listing)
|
|
else:
|
|
state.failed_count += 1
|
|
reporter.notify()
|
|
|
|
|
|
|
|
@app.task(bind=True, pydantic=True)
|
|
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
msg = "Another scrape job is already running, skipping this execution"
|
|
celery_logger.warning(msg)
|
|
meta = {"reason": "Another scrape job is running"}
|
|
self.update_state(state="SKIPPED", meta=meta)
|
|
publish_task_progress(self.request.id, "SKIPPED", meta)
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}")
|
|
|
|
self.update_state(state="Starting...", meta={"phase": PHASE_SPLITTING, "progress": 0})
|
|
publish_task_progress(self.request.id, "Starting...", {"phase": PHASE_SPLITTING, "progress": 0})
|
|
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
|
|
result = {"phase": PHASE_COMPLETED, "progress": 1}
|
|
publish_task_progress(self.request.id, "SUCCESS", result)
|
|
return result
|
|
|
|
|
|
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
celery_logger.warning("Another scrape job is already running, skipping this execution")
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
await dump_listings_full(task=Task(), parameters=parsed_parameters)
|
|
return {"progress": 0}
|
|
|
|
|
|
async def dump_listings_full(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Fetches all listings, images as well as detects floorplans"""
|
|
global _active_log_buffer
|
|
|
|
# Set up log capture into a module-level buffer so _update_task_state
|
|
# can inject logs into every state update.
|
|
log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES)
|
|
log_handler = TaskLogHandler(log_buffer)
|
|
log_handler.setFormatter(
|
|
logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")
|
|
)
|
|
|
|
# Attach handler to both loggers used in the codebase, and ensure
|
|
# they accept INFO-level messages (Celery's worker setup may leave
|
|
# the celery.task logger at WARNING).
|
|
_prev_celery_level = celery_logger.level
|
|
_prev_logger_level = logger.level
|
|
celery_logger.addHandler(log_handler)
|
|
logger.addHandler(log_handler)
|
|
if celery_logger.level == logging.NOTSET or celery_logger.level > logging.INFO:
|
|
celery_logger.setLevel(logging.INFO)
|
|
if logger.level == logging.NOTSET or logger.level > logging.INFO:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
_active_log_buffer = log_buffer
|
|
|
|
try:
|
|
return await _dump_listings_full_inner(task=task, parameters=parameters)
|
|
finally:
|
|
_active_log_buffer = None
|
|
celery_logger.removeHandler(log_handler)
|
|
logger.removeHandler(log_handler)
|
|
celery_logger.setLevel(_prev_celery_level)
|
|
logger.setLevel(_prev_logger_level)
|
|
|
|
|
|
async def _dump_listings_full_inner(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Inner implementation with log capture active.
|
|
|
|
Uses a streaming pipeline: an asyncio.Queue bridges the fetcher (producer)
|
|
and processor workers (consumers) so that listing processing starts as
|
|
soon as IDs become available from each subquery.
|
|
"""
|
|
start_time = time.time()
|
|
state = _PipelineState()
|
|
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info(f"PHASE 1: Splitting queries")
|
|
celery_logger.info("=" * 60)
|
|
|
|
repository = ListingRepository(engine)
|
|
config = ScraperConfig.from_env()
|
|
splitter = QuerySplitter(config)
|
|
|
|
reset_throttle_metrics()
|
|
|
|
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
|
|
meta: dict[str, Any] = {"phase": phase, "message": message}
|
|
meta.update(kwargs)
|
|
_update_task_state(task, message, meta)
|
|
celery_logger.info(f"[{phase}] {message}")
|
|
|
|
_update_task_state(task, "Analyzing query and splitting by price bands...", {
|
|
"phase": PHASE_SPLITTING, "progress": 0,
|
|
})
|
|
celery_logger.info("Starting query splitting and probing...")
|
|
|
|
try:
|
|
async with create_session(config) as session:
|
|
subqueries = await splitter.split(parameters, session, on_progress)
|
|
|
|
total_estimated = splitter.calculate_total_estimated_results(subqueries)
|
|
celery_logger.info(
|
|
f"Query split complete: {len(subqueries)} subqueries, "
|
|
f"~{total_estimated} estimated total results"
|
|
)
|
|
|
|
celery_logger.info("Loading existing listing IDs from database...")
|
|
existing_ids = repository.get_listing_ids(parameters.listing_type)
|
|
celery_logger.info(f"Found {len(existing_ids)} existing listings in DB")
|
|
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info(f"PHASE 2: Streaming fetch & process")
|
|
celery_logger.info("=" * 60)
|
|
|
|
queue: asyncio.Queue[int | None] = asyncio.Queue()
|
|
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
|
|
|
_update_task_state(
|
|
task,
|
|
f"Fetching listings from {len(subqueries)} subqueries...",
|
|
{
|
|
"phase": PHASE_FETCHING,
|
|
"subqueries_completed": 0,
|
|
"subqueries_total": len(subqueries),
|
|
"ids_collected": 0,
|
|
"pages_fetched": 0,
|
|
"estimated_results": total_estimated,
|
|
"fetching_done": False,
|
|
},
|
|
)
|
|
|
|
listing_processor = ListingProcessor(repository, parameters.listing_type)
|
|
|
|
reporter = ProgressReporter(
|
|
task, state, len(subqueries), start_time,
|
|
)
|
|
|
|
# Producer: fetch all subqueries concurrently, then signal workers to stop
|
|
async def producer() -> None:
|
|
await asyncio.gather(
|
|
*[
|
|
_fetch_subquery(
|
|
sq, parameters, session, config,
|
|
semaphore, existing_ids, queue, state,
|
|
reporter,
|
|
)
|
|
for sq in subqueries
|
|
]
|
|
)
|
|
|
|
celery_logger.info(
|
|
f"Fetch complete: {state.total_pages_fetched} pages from "
|
|
f"{state.completed_subqueries} subqueries, "
|
|
f"{state.ids_collected} new IDs"
|
|
)
|
|
state.fetching_done = True
|
|
reporter.notify(force=True)
|
|
|
|
for _ in range(NUM_WORKERS):
|
|
await queue.put(None)
|
|
|
|
await asyncio.gather(
|
|
producer(),
|
|
*[_process_worker(queue, listing_processor, state, reporter) for _ in range(NUM_WORKERS)],
|
|
reporter.run_background_flush(),
|
|
)
|
|
|
|
reporter.flush()
|
|
|
|
except CircuitBreakerOpenError as e:
|
|
celery_logger.error(f"Circuit breaker prevented query: {e}")
|
|
metrics = get_throttle_metrics()
|
|
if metrics.total_requests > 0:
|
|
celery_logger.info(metrics.summary())
|
|
return []
|
|
finally:
|
|
metrics = get_throttle_metrics()
|
|
if metrics.total_requests > 0:
|
|
celery_logger.info(
|
|
f"API Stats: {metrics.total_requests} requests, "
|
|
f"avg {metrics.average_response_time:.2f}s, "
|
|
f"{metrics.total_throttling_events} throttled"
|
|
)
|
|
|
|
elapsed = time.time() - start_time
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info(
|
|
f"COMPLETED: Processed {len(state.processed_listings)} listings in {elapsed:.1f}s"
|
|
)
|
|
celery_logger.info("=" * 60)
|
|
|
|
# Record scrape metrics
|
|
from api.metrics import (
|
|
scrape_listings_found,
|
|
scrape_listings_processed,
|
|
scrape_listings_failed,
|
|
scrape_duration_seconds,
|
|
scrape_pages_fetched,
|
|
scrape_subqueries_total as scrape_subqueries_metric,
|
|
)
|
|
scrape_listings_found.add(state.ids_collected)
|
|
scrape_listings_processed.add(state.processed_count)
|
|
scrape_listings_failed.add(state.failed_count)
|
|
scrape_duration_seconds.record(elapsed)
|
|
scrape_pages_fetched.add(state.total_pages_fetched)
|
|
scrape_subqueries_metric.add(state.completed_subqueries)
|
|
|
|
invalidate_cache()
|
|
|
|
_update_task_state(task, "Completed", {
|
|
"phase": PHASE_COMPLETED, "progress": 1,
|
|
"processed": len(state.processed_listings), "total": state.ids_collected,
|
|
"message": f"Processed {len(state.processed_listings)} listings in {elapsed:.0f}s",
|
|
})
|
|
|
|
return state.processed_listings
|
|
|
|
|
|
@app.on_after_finalize.connect
|
|
def setup_periodic_tasks(sender, **kwargs):
|
|
"""Register periodic tasks from environment configuration."""
|
|
try:
|
|
config = SchedulesConfig.from_env()
|
|
except ValueError as e:
|
|
celery_logger.error(f"Failed to load schedule configuration: {e}")
|
|
return
|
|
|
|
for schedule in config.get_enabled_schedules():
|
|
celery_logger.info(
|
|
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
|
|
)
|
|
|
|
sender.add_periodic_task(
|
|
crontab(
|
|
minute=schedule.minute,
|
|
hour=schedule.hour,
|
|
day_of_week=schedule.day_of_week,
|
|
),
|
|
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
|
|
name=schedule.name,
|
|
)
|