"""Unified listing service - shared between CLI and HTTP API. This module provides the core business logic for listing operations. Both the CLI (main.py) and HTTP API (api/app.py) should use these functions. """ from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any from models.listing import Listing, QueryParameters from repositories.listing_repository import ListingRepository @dataclass class ListingResult: """Result of a listing operation.""" listings: list[Listing] total_count: int message: str | None = None @dataclass class RefreshResult: """Result of a refresh operation.""" task_id: str | None # None if run synchronously new_listings_count: int message: str async def get_listings( repository: ListingRepository, query_parameters: QueryParameters | None = None, limit: int | None = None, only_ids: list[int] | None = None, ) -> ListingResult: """Get listings from the database with optional filtering. Used by: - CLI: export-csv, export-immoweb - API: GET /api/listing, GET /api/listing_geojson """ listings = await repository.get_listings( query_parameters=query_parameters, limit=limit, only_ids=only_ids, ) return ListingResult( listings=listings, total_count=len(listings), ) async def refresh_listings( repository: ListingRepository, query_parameters: QueryParameters, full: bool = False, async_mode: bool = False, user_email: str | None = None, ) -> RefreshResult: """Refresh listings by fetching from external API. Args: repository: Database repository query_parameters: Filtering parameters full: If True, also fetch images and run OCR async_mode: If True, run as background task and return task_id user_email: User email for tracking (API mode) Used by: - CLI: dump-listings - API: POST /api/refresh_listings """ if async_mode: # Import here to avoid circular imports from tasks.listing_tasks import dump_listings_task from datetime import timedelta expiry_time = datetime.now() + timedelta(minutes=10) task = dump_listings_task.apply_async( args=(query_parameters.model_dump_json(),), expires=expiry_time, ) return RefreshResult( task_id=task.id, new_listings_count=0, message=f"Task {task.id} started", ) # Synchronous mode - run directly from services.listing_fetcher import dump_listings, dump_listings_full if full: new_listings = await dump_listings_full(query_parameters, repository) else: new_listings = await dump_listings(query_parameters, repository) return RefreshResult( task_id=None, new_listings_count=len(new_listings), message=f"Fetched {len(new_listings)} new listings", ) async def download_images( repository: ListingRepository, data_dir: Path = Path("data/rs/"), ) -> int: """Download floorplan images for all listings. Used by: - CLI: dump-images - API: (could be added) Returns: Number of listings processed """ from services.image_fetcher import dump_images await dump_images(repository, image_base_path=data_dir) listings = await repository.get_listings() return len(listings) async def detect_floorplans( repository: ListingRepository, ) -> int: """Run OCR on floorplan images to detect square meters. Used by: - CLI: detect-floorplan - API: (could be added) Returns: Number of listings processed """ from services.floorplan_detector import detect_floorplan await detect_floorplan(repository) listings = await repository.get_listings() return len(listings) async def calculate_routes( repository: ListingRepository, destination_address: str, travel_mode: str, limit: int | None = None, ) -> int: """Calculate transit routes for listings. Used by: - CLI: routing - API: (could be added) Returns: Number of listings processed """ from services.route_calculator import calculate_route from rec.routing import TravelMode await calculate_route( repository, destination_address, TravelMode[travel_mode], limit=limit, ) return limit or 0