wrongmove/tasks/listing_tasks.py
Viktor Barzin a42944a756 wrongmove: round-3 fix sweep — scrape pipeline, BUY tab, filter URL state, render hygiene, map polish
Coordinated fix across 31 bugs found in a parallel QA pass. Findings docs at /tmp/wrongmove-bugs/qa-round-3/qa{1,2,3,4}-*.md.

## Backend / scrape (Fix-1) — 8 bugs

- B1 [P0] Scrape totally broken on prod: pod UID 100 vs NFS dir 1000:1000 mode 775 → PermissionError on every never-seen listing. Switched Dockerfile to explicit `useradd --uid 1000 --gid 1000`; added securityContext + chown initContainer to k8s/{api,celery-beat}-deployment.yaml. Celery worker manifest lives outside this repo — Dockerfile UID change is the load-bearing fix.
- B4 [P1] Celery broker reaped every ~30s by Redis HAProxy idle timeout. Added `broker_transport_options` / `result_backend_transport_options` with `socket_keepalive=True, health_check_interval=25` in celery_app.py + same kwargs on every redis.from_url/Redis call across services/, utils/redis_lock.py, redis_repository.py.
- B5 [P1] dump_listings_task never published terminal FAILURE to the task_progress pub/sub channel — UI polled forever. Wrap body in try/except that publishes FAILURE before re-raising.
- B6 [P1] _process_worker had no per-listing exception handler — one bad listing killed the whole scrape via asyncio.gather. Wrap loop body in try/except Exception (re-raises CancelledError).
- B20 [P2] dump_listings_task gained time_limit=3600, soft_time_limit=3500, acks_late=True.
- B21 [P2] RedisRepository moved off shared db0 (was alongside paperless-ngx) to db3 via REDIS_USER_DB env var; keys prefixed `wrongmove:user:`.
- B32 [P3] redis_lock now uses uuid4() owner token + Lua compare-and-delete.
- B33 [P3] Slack notify in refresh_listings → asyncio.create_task (fire-and-forget).

## Frontend filter system (Fix-2) — 7 bugs

- B2 [P0] BUY tab click triggered "Maximum update depth exceeded" → ErrorBoundary. Replaced the three mutually-triggering useEffects in FilterBar with a single one-way controlled-value flow (URL → parent state → form), guarded by previousListingTypeRef so price-defaults fires once per real transition.
- B3 [P0] Filter values never reached the URL. Wired useFilterParams.setFilterValues into FilterBar/FilterPanel onSubmit + handleRemoveChip + new handleResetAllFilters; fed parsed filterValues into both forms' defaultValues; added URL→form sync via form.reset on browser back/forward.
- B8 [P1] Chip removal now resets form state via new FilterBar onFormReady callback — More badge no longer sticks.
- B12 [P2] Desktop swipe-review FAB added next to header (mobile FAB unchanged).
- B17 [P2] "Reset all" affordance on chip strip.
- B22 [P2] formatPrice precision: 1500 → £1.5k, 2500 → £2.5k (no longer collides with £2k/£3k defaults).
- B30 [P3] last_seen_days input gained min={0}.

## Frontend render hygiene + data integrity (Fix-3) — 8 bugs

- B7 [P1] streamingService bails on first non-NDJSON chunk (HTML response = backend down) and throws StreamParseError so the existing AlertError dialog surfaces a single user-visible error instead of 18× console.error spam.
- B9 [P1] formatDuration widened to (null|undefined|number): returns "—" for non-finite or negative, caps implausibly large values.
- B10 [P1] PropertyCard / PropertyCardCompact / SwipeCard JSX leaves render "—" for null total_price/qm/qmprice (was "£0/0 m²/£0/m²" — looked like free listings).
- B13 [P2] hexgrid worker reduceAverage uses Number.isFinite filter instead of !isNaN (which incorrectly accepted null → 0, biasing per-hex averages low).
- B14 [P2] ListingDetail Overview wraps agency in "Listed by" labelled block so it can't collapse to a bare agency name.
- B15 [P2] Compact POIDistanceBadges iterates all three travel modes with "—" for missing, matching the detail-sheet Travel table.
- B24 [P3] Drawer.Description (sr-only) added to ListingDetailSheet + MobileBottomSheet to silence Radix a11y warning.
- B25 [P3] lastSeenDays clamped to ≥0 so future timestamps don't render as "-7d ago".

## Frontend map / carousel / tasks polish (Fix-4) — 8 bugs

- B11 [P2] HexgridHeatmapClient destroy race: Map.tsx adds .catch() + ref guard so post-destroy promise rejections are silent no-ops. Verified by browser smoke (24 rapid Map↔List toggles → 0 pageErrors).
- B16 [P2] PhotoCarousel + inner CardCarousel gained keyboard nav (Arrow keys).
- B18 [P2] Default map center moved from Czech Republic to London (zoom 10).
- B19+B29 [P2/P3] Mapbox token: no longer hard-coded fallback; reads env-only and shows a clear "Map unavailable — set VITE_MAPBOX_TOKEN" banner when missing.
- B23 [P3] PhotoCarousel suppresses "1/1" counter for single-photo listings; added onError fallback for broken URLs.
- B26 [P3] PhotoCarousel only enables loop when photos.length > 1.
- B27 [P3] TaskIndicator cancel/clear-all buttons gained aria-label + data-testid.
- B28 [P3] useTaskProgress strips terminal-local task IDs from the polling union — no more forever-poll on completed tasks.

## Tests

74 new vitest tests + 18 new pytest tests. Local: tsc clean, 201 vitest tests pass, 633 pytest tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 22:27:29 +00:00

658 lines
24 KiB
Python

import asyncio
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Any
from celery import Task
from celery.schedules import crontab
from celery_app import app
from config.schedule_config import SchedulesConfig
from config.scraper_config import ScraperConfig
from listing_processor import ListingProcessor
from models.listing import Listing, QueryParameters
from rec.query import create_session, listing_query
from rec.exceptions import CircuitBreakerOpenError, InvalidResponseError, ThrottlingError
from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics
from repositories.listing_repository import ListingRepository
from database import engine
from services.query_splitter import QuerySplitter, SubQuery
from utils.redis_lock import redis_lock
from services.listing_cache import invalidate_cache
from services.task_progress_publisher import publish_task_progress
logger = logging.getLogger("uvicorn.error")
# Central logging is now configured in celery_app.py via logging_config
celery_logger = logging.getLogger("celery.task")
SCRAPE_LOCK_NAME = "scrape_listings"
LOG_BUFFER_MAX_LINES = 200
# Number of concurrent consumer workers that process listings from the queue.
NUM_WORKERS = 20
# Phase constants for task state reporting
PHASE_SPLITTING = "splitting"
PHASE_FETCHING = "fetching"
PHASE_PROCESSING = "processing"
PHASE_COMPLETED = "completed"
# Module-level log buffer — active only during task execution.
# This is safe as module-level mutable state because Celery workers use a
# prefork pool: each worker process handles one task at a time, so there is
# no concurrent access within a single process. The TaskLogHandler appends
# here; _update_task_state reads from here.
_active_log_buffer: deque[str] | None = None
@dataclass
class _PipelineState:
"""Shared mutable state for the streaming fetch-and-process pipeline."""
ids_collected: int = 0
completed_subqueries: int = 0
total_pages_fetched: int = 0
fetching_done: bool = False
processed_count: int = 0
failed_count: int = 0
details_fetched: int = 0
images_downloaded: int = 0
ocr_completed: int = 0
processed_listings: list[Listing] = field(default_factory=list)
class ProgressReporter:
"""Event-driven, throttled progress reporter.
Call ``notify()`` whenever pipeline state changes (page fetched, item
processed, phase change). Publishes are throttled to at most one every
``min_interval`` seconds to avoid flooding Redis / WebSocket. Use
``force=True`` for phase changes, completion, and errors so they are
published immediately.
``run_background_flush`` is a long-running coroutine that publishes every
``background_interval`` seconds during quiet periods (keeps ETA / elapsed
fields current even when no items are being processed).
"""
def __init__(
self,
task: Task,
state: _PipelineState,
subqueries_total: int,
start_time: float,
min_interval: float = 0.25,
background_interval: float = 2.0,
) -> None:
self._task = task
self._state = state
self._subqueries_total = subqueries_total
self._start_time = start_time
self._min_interval = min_interval
self._background_interval = background_interval
self._last_publish: float = 0.0
self._last_log_progress: float = 0.0
self._dirty = False
self._deferred: asyncio.TimerHandle | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._stopped = False
# -- internal helpers ---------------------------------------------------
def _build_meta(self) -> tuple[str, dict[str, Any]]:
state = self._state
total = state.ids_collected
done = state.processed_count + state.failed_count
phase = PHASE_PROCESSING if state.fetching_done else PHASE_FETCHING
progress_ratio = round(done / total, 2) if total > 0 else 0.0
elapsed = time.time() - self._start_time
rate = done / elapsed if elapsed > 0 else 0
remaining = (total - done) if total > 0 else 0
eta = remaining / rate if rate > 0 else 0
label = (
f"{'Processing' if state.fetching_done else 'Fetching & processing'}: "
f"{done}/{total}"
)
meta: dict[str, Any] = {
"phase": phase,
"progress": progress_ratio,
"processed": done,
"total": total,
"subqueries_completed": state.completed_subqueries,
"subqueries_total": self._subqueries_total,
"ids_collected": state.ids_collected,
"pages_fetched": state.total_pages_fetched,
"fetching_done": state.fetching_done,
"details_fetched": state.details_fetched,
"images_downloaded": state.images_downloaded,
"ocr_completed": state.ocr_completed,
"failed": state.failed_count,
"elapsed_seconds": round(elapsed, 1),
"rate_per_second": round(rate, 2),
"eta_seconds": round(eta, 1),
}
return label, meta
def _publish(self) -> None:
label, meta = self._build_meta()
# Log milestones at every 10% progress
progress_ratio = meta["progress"]
if progress_ratio >= self._last_log_progress + 0.1 or meta["processed"] == 1:
done = meta["processed"]
total = meta["total"]
elapsed = meta["elapsed_seconds"]
rate = meta["rate_per_second"]
eta = meta["eta_seconds"]
celery_logger.info(
f"Progress: {progress_ratio * 100:.0f}% "
f"({done}/{total}) "
f"| Elapsed: {elapsed:.0f}s "
f"| Rate: {rate:.1f}/s "
f"| ETA: {eta:.0f}s"
)
self._last_log_progress = progress_ratio
_update_task_state(self._task, label, meta)
self._last_publish = time.time()
self._dirty = False
def _cancel_deferred(self) -> None:
if self._deferred is not None:
self._deferred.cancel()
self._deferred = None
def _deferred_callback(self) -> None:
"""Called by asyncio.call_later when the throttle window expires."""
self._deferred = None
if self._dirty and not self._stopped:
self._publish()
# -- public API ---------------------------------------------------------
def notify(self, force: bool = False) -> None:
"""Signal that pipeline state has changed.
If ``force`` is True the publish happens immediately (use for phase
changes and completion). Otherwise the publish is throttled.
"""
if self._stopped:
return
self._dirty = True
now = time.time()
if force or (now - self._last_publish) >= self._min_interval:
self._cancel_deferred()
self._publish()
elif self._deferred is None:
# Schedule a deferred flush so trailing updates aren't lost
loop = self._loop or asyncio.get_event_loop()
delay = self._min_interval - (now - self._last_publish)
self._deferred = loop.call_later(delay, self._deferred_callback)
def flush(self) -> None:
"""Force a final publish (call at pipeline end)."""
self._cancel_deferred()
self._publish()
async def run_background_flush(self) -> None:
"""Background keepalive: publish every ``background_interval`` seconds.
This keeps ETA / elapsed fields current during quiet periods.
Exits when the pipeline is done (fetching complete and all items
processed).
"""
self._loop = asyncio.get_running_loop()
while True:
state = self._state
total = state.ids_collected
done = state.processed_count + state.failed_count
if state.fetching_done and done >= total and total > 0:
break
if state.fetching_done and total == 0:
break
await asyncio.sleep(self._background_interval)
# Publish if nothing else has published recently
if not self._stopped:
now = time.time()
if (now - self._last_publish) >= self._background_interval:
self._publish()
self._stopped = True
self._cancel_deferred()
class TaskLogHandler(logging.Handler):
"""Captures log records into a deque for inclusion in task state updates."""
def __init__(self, buffer: deque[str]) -> None:
super().__init__()
self.buffer = buffer
def emit(self, record: logging.LogRecord) -> None:
try:
self.buffer.append(self.format(record))
except Exception:
pass
def _update_task_state(task: Task, state: str, meta: dict[str, Any]) -> None:
"""Call task.update_state with logs injected from the active log buffer."""
if _active_log_buffer is not None:
meta["logs"] = list(_active_log_buffer)
task.update_state(state=state, meta=meta)
if hasattr(task, 'request') and task.request.id:
publish_task_progress(task.request.id, state, meta)
async def _fetch_subquery(
sq: SubQuery,
parameters: QueryParameters,
session: object,
config: ScraperConfig,
semaphore: asyncio.Semaphore,
existing_ids: set[int],
queue: asyncio.Queue[int | None],
state: _PipelineState,
reporter: ProgressReporter,
) -> None:
"""Fetch pages for a single subquery and enqueue new listing IDs."""
estimated = sq.estimated_results or 0
if estimated == 0:
state.completed_subqueries += 1
reporter.notify()
return
page_size = parameters.page_size
max_pages = min(
config.max_pages_per_query,
(estimated // page_size) + 1,
)
for page_id in range(1, max_pages + 1):
async with semaphore:
await asyncio.sleep(config.request_delay_ms / 1000)
try:
result = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=sq.min_bedrooms,
max_bedrooms=sq.max_bedrooms,
radius=parameters.radius,
min_price=sq.min_price,
max_price=sq.max_price,
district=sq.district,
page_size=page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
session=session,
config=config,
)
state.total_pages_fetched += 1
reporter.notify()
properties = result.get("properties", [])
for prop in properties:
identifier = prop.get("identifier")
if identifier and identifier not in existing_ids:
existing_ids.add(identifier)
state.ids_collected += 1
await queue.put(identifier)
if len(properties) < page_size:
break
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker open: {e}")
break
except ThrottlingError as e:
celery_logger.warning(
f"Throttling on {sq.district} page {page_id}: {e}"
)
break
except InvalidResponseError:
celery_logger.debug(
f"Max page for {sq.district}: {page_id - 1}"
)
break
except Exception as e:
celery_logger.warning(
f"Error fetching page {page_id} for "
f"{sq.district}: {e}"
)
break
state.completed_subqueries += 1
reporter.notify()
async def _process_worker(
queue: asyncio.Queue[int | None],
processor: ListingProcessor,
state: _PipelineState,
reporter: ProgressReporter,
) -> None:
"""Consumer worker: pull listing IDs from the queue and process them.
Per-listing exceptions (PermissionError, OSError, asyncio.TimeoutError,
etc.) are caught, counted, logged, and skipped so a single bad listing
cannot abort the entire scrape pipeline. CancelledError is re-raised so
cooperative cancellation still works.
"""
while True:
listing_id = await queue.get()
if listing_id is None:
break
def step_callback(step_name: str) -> None:
if step_name == "details":
state.details_fetched += 1
elif step_name == "images":
state.images_downloaded += 1
elif step_name == "ocr":
state.ocr_completed += 1
try:
listing = await processor.process_listing(
listing_id, on_step_complete=step_callback
)
if listing is not None:
state.processed_count += 1
state.processed_listings.append(listing)
else:
state.failed_count += 1
except asyncio.CancelledError:
# Cooperative cancellation — let it propagate so the gather()
# in dump_listings_full can unwind cleanly.
raise
except Exception:
state.failed_count += 1
celery_logger.exception(
"Unhandled exception processing listing %s; skipping", listing_id
)
reporter.notify()
@app.task(
bind=True,
pydantic=True,
# Hard kill if the task hasn't completed in 1h (matches the SLA — a
# full scrape takes ~10-15 min in practice). soft_time_limit raises
# SoftTimeLimitExceeded so cleanup / FAILURE-publish runs first.
time_limit=3600,
soft_time_limit=3500,
# acks_late so a worker crash mid-task re-queues the task instead of
# acknowledging-and-losing it. Combined with the visibility_timeout
# defaults, this is safe because the lock at the top of the body
# prevents two workers running concurrently.
acks_late=True,
)
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
if not acquired:
msg = "Another scrape job is already running, skipping this execution"
celery_logger.warning(msg)
meta = {"reason": "Another scrape job is running"}
self.update_state(state="SKIPPED", meta=meta)
publish_task_progress(self.request.id, "SKIPPED", meta)
return {"status": "skipped", "reason": "another_job_running"}
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}")
self.update_state(state="Starting...", meta={"phase": PHASE_SPLITTING, "progress": 0})
publish_task_progress(self.request.id, "Starting...", {"phase": PHASE_SPLITTING, "progress": 0})
try:
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
except Exception as exc:
# Publish a terminal FAILURE event so WebSocket subscribers update
# immediately, then re-raise so Celery records the failure in
# the result backend.
celery_logger.exception("dump_listings_task failed: %s", exc)
publish_task_progress(
self.request.id,
"FAILURE",
{
"phase": PHASE_COMPLETED,
"progress": 0,
"error": str(exc),
"exc_type": type(exc).__name__,
},
)
raise
result = {"phase": PHASE_COMPLETED, "progress": 1}
publish_task_progress(self.request.id, "SUCCESS", result)
return result
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
if not acquired:
celery_logger.warning("Another scrape job is already running, skipping this execution")
return {"status": "skipped", "reason": "another_job_running"}
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
await dump_listings_full(task=Task(), parameters=parsed_parameters)
return {"progress": 0}
async def dump_listings_full(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans"""
global _active_log_buffer
# Set up log capture into a module-level buffer so _update_task_state
# can inject logs into every state update.
log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES)
log_handler = TaskLogHandler(log_buffer)
log_handler.setFormatter(
logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")
)
# Attach handler to both loggers used in the codebase, and ensure
# they accept INFO-level messages (Celery's worker setup may leave
# the celery.task logger at WARNING).
_prev_celery_level = celery_logger.level
_prev_logger_level = logger.level
celery_logger.addHandler(log_handler)
logger.addHandler(log_handler)
if celery_logger.level == logging.NOTSET or celery_logger.level > logging.INFO:
celery_logger.setLevel(logging.INFO)
if logger.level == logging.NOTSET or logger.level > logging.INFO:
logger.setLevel(logging.INFO)
_active_log_buffer = log_buffer
try:
return await _dump_listings_full_inner(task=task, parameters=parameters)
finally:
_active_log_buffer = None
celery_logger.removeHandler(log_handler)
logger.removeHandler(log_handler)
celery_logger.setLevel(_prev_celery_level)
logger.setLevel(_prev_logger_level)
async def _dump_listings_full_inner(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Inner implementation with log capture active.
Uses a streaming pipeline: an asyncio.Queue bridges the fetcher (producer)
and processor workers (consumers) so that listing processing starts as
soon as IDs become available from each subquery.
"""
start_time = time.time()
state = _PipelineState()
celery_logger.info("=" * 60)
celery_logger.info(f"PHASE 1: Splitting queries")
celery_logger.info("=" * 60)
repository = ListingRepository(engine)
config = ScraperConfig.from_env()
splitter = QuerySplitter(config)
reset_throttle_metrics()
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
meta: dict[str, Any] = {"phase": phase, "message": message}
meta.update(kwargs)
_update_task_state(task, message, meta)
celery_logger.info(f"[{phase}] {message}")
_update_task_state(task, "Analyzing query and splitting by price bands...", {
"phase": PHASE_SPLITTING, "progress": 0,
})
celery_logger.info("Starting query splitting and probing...")
try:
async with create_session(config) as session:
subqueries = await splitter.split(parameters, session, on_progress)
total_estimated = splitter.calculate_total_estimated_results(subqueries)
celery_logger.info(
f"Query split complete: {len(subqueries)} subqueries, "
f"~{total_estimated} estimated total results"
)
celery_logger.info("Loading existing listing IDs from database...")
existing_ids = repository.get_listing_ids(parameters.listing_type)
celery_logger.info(f"Found {len(existing_ids)} existing listings in DB")
celery_logger.info("=" * 60)
celery_logger.info(f"PHASE 2: Streaming fetch & process")
celery_logger.info("=" * 60)
queue: asyncio.Queue[int | None] = asyncio.Queue()
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
_update_task_state(
task,
f"Fetching listings from {len(subqueries)} subqueries...",
{
"phase": PHASE_FETCHING,
"subqueries_completed": 0,
"subqueries_total": len(subqueries),
"ids_collected": 0,
"pages_fetched": 0,
"estimated_results": total_estimated,
"fetching_done": False,
},
)
listing_processor = ListingProcessor(repository, parameters.listing_type)
reporter = ProgressReporter(
task, state, len(subqueries), start_time,
)
# Producer: fetch all subqueries concurrently, then signal workers to stop
async def producer() -> None:
await asyncio.gather(
*[
_fetch_subquery(
sq, parameters, session, config,
semaphore, existing_ids, queue, state,
reporter,
)
for sq in subqueries
]
)
celery_logger.info(
f"Fetch complete: {state.total_pages_fetched} pages from "
f"{state.completed_subqueries} subqueries, "
f"{state.ids_collected} new IDs"
)
state.fetching_done = True
reporter.notify(force=True)
for _ in range(NUM_WORKERS):
await queue.put(None)
await asyncio.gather(
producer(),
*[_process_worker(queue, listing_processor, state, reporter) for _ in range(NUM_WORKERS)],
reporter.run_background_flush(),
)
reporter.flush()
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker prevented query: {e}")
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(metrics.summary())
return []
finally:
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(
f"API Stats: {metrics.total_requests} requests, "
f"avg {metrics.average_response_time:.2f}s, "
f"{metrics.total_throttling_events} throttled"
)
elapsed = time.time() - start_time
celery_logger.info("=" * 60)
celery_logger.info(
f"COMPLETED: Processed {len(state.processed_listings)} listings in {elapsed:.1f}s"
)
celery_logger.info("=" * 60)
# Record scrape metrics
import api.metrics as m
m.scrape_listings_found.add(state.ids_collected)
m.scrape_listings_processed.add(state.processed_count)
m.scrape_listings_failed.add(state.failed_count)
m.scrape_duration_seconds.record(elapsed)
m.scrape_pages_fetched.add(state.total_pages_fetched)
m.scrape_subqueries_total.add(state.completed_subqueries)
invalidate_cache()
_update_task_state(task, "Completed", {
"phase": PHASE_COMPLETED, "progress": 1,
"processed": len(state.processed_listings), "total": state.ids_collected,
"message": f"Processed {len(state.processed_listings)} listings in {elapsed:.0f}s",
})
return state.processed_listings
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""Register periodic tasks from environment configuration."""
try:
config = SchedulesConfig.from_env()
except ValueError as e:
celery_logger.error(f"Failed to load schedule configuration: {e}")
return
for schedule in config.get_enabled_schedules():
celery_logger.info(
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
)
sender.add_periodic_task(
crontab(
minute=schedule.minute,
hour=schedule.hour,
day_of_week=schedule.day_of_week,
),
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
name=schedule.name,
)