Two surfaces wired up so the user can "get a vibe of the market": **Per-listing** — each PropertyCard now shows a small pill next to the price when the listing's total_price moved >=1% over a 14-day lookback (e.g. "↓ £200 (-4%) in 14d"). Drops render green, rises render red. Computed from `price_history_json` by the daily aggregator and denormalised onto the listing row so the streaming endpoint just passes it through. **Macro** — new always-visible inline strip above the chip strip showing today's median total price, median £/m², and listing count for the current filter's bedroom band, each with a 30-day % delta: "Rent · 1-2 bed · 30d: Median £2,500 ↓ -4% · £/m² £50 ↓ -2% · Listings 4,200 ↑ +5%". Both data sources are populated daily at 04:00 UTC by a new Celery beat task that fires 1h after the 03:00 RENT scrape and feeds two sinks: a per-listing update pass and an upsert to a new `dailylistingaggregate` table keyed on (snapshot_date, listing_type, min_bedrooms, max_bedrooms). ## Backend - `models/listing.py`: Listing parent gains `price_14d_ago` + `price_ change_pct_14d` nullable floats (inherited by RentListing/BuyListing). New `DailyListingAggregate` table model with unique constraint on (date, type, min_bed, max_bed). - Alembic `a8b9c0d1e2f3`: adds the two columns to both listing tables and creates the aggregate table + date index. - `services/market_aggregator.py` (new): `compute_trend_for_listing`, `update_per_listing_trend` (batched, idempotent), `_stats` (median + mean filtered to positive finite values), `compute_aggregate_ snapshot` (dialect-aware MySQL / SQLite upsert), `fetch_trend_ series` (range query for the API). - `tasks/market_tasks.py` (new): `compute_daily_market_aggregates_task` Celery task wrapping both stages. - `tasks/listing_tasks.py:setup_periodic_tasks`: registers the daily task at 04:00 UTC alongside the existing scrape schedules. - `celery_app.py`: includes the new tasks module. - `api/app.py`: new `GET /api/market_trend?listing_type=&min_bedrooms=& max_bedrooms=&days=` endpoint returning the daily series. - `ui_exporter.py`: GeoJSON feature properties now carry `price_14d_ago` and `price_change_pct_14d` so the frontend can render the badge without an extra round-trip. ## Frontend - `types/index.ts`: new `MarketTrendPoint`; `PropertyProperties` gains the two optional trend fields. - `components/PropertyCard.tsx`: derived `trendBadge` (>=1% threshold, null-safe) rendered as a small pill on both card variants. - `hooks/useMarketTrend.ts` (new): fetches the trend series, derives current-vs-oldest deltas per metric (% change rounded to 1dp). - `components/MarketTrendStrip.tsx` (new): compact inline strip with three metric cells. Hidden when the aggregator hasn't produced any rows yet (graceful start during the first week post-launch). - `App.tsx`: renders the strip above the chip strip whenever the active queryParameters are known. ## Tests - pytest: 10 new (trend math edge cases including null history, malformed JSON, only-recent entries, drops, rises, zero current price; _stats empty / nonpositive filtering; upsert idempotency on an in-memory SQLite seed). 34 decision + aggregator tests pass. - vitest: 8 new (useMarketTrend fetch URL, two-point delta, single-point null delta, empty series; PropertyCard trend badge arrow direction + sign for drops/rises, noise threshold, null guard). 229 tests pass total, tsc clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
935 lines
35 KiB
Python
935 lines
35 KiB
Python
"""FastAPI application for the Real Estate Crawler API."""
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
import json
|
||
import logging
|
||
import logging.config
|
||
import time
|
||
from typing import Annotated, AsyncGenerator, Optional
|
||
from api.auth import get_current_user
|
||
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS, APP_ENV
|
||
from api.decision_routes import decision_router
|
||
from api.passkey_routes import passkey_router
|
||
from api.perf_routes import perf_router
|
||
from api.poi_routes import poi_router
|
||
from api.ws_routes import ws_router
|
||
from api.rate_limit_config import RateLimitConfig
|
||
from api.rate_limiter import RateLimitMiddleware
|
||
from api.audit_middleware import AuditLogMiddleware
|
||
from api.metrics_guard import MetricsGuardMiddleware
|
||
from api.security_headers import SecurityHeadersMiddleware
|
||
from api.origin_validator import OriginValidatorMiddleware
|
||
from dotenv import load_dotenv
|
||
from fastapi import Depends, FastAPI, HTTPException, Query, Response
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
from pydantic import BaseModel
|
||
from starlette.requests import Request
|
||
from api.auth import User
|
||
from models.listing import QueryParameters, ListingType, FurnishType
|
||
from notifications import send_notification
|
||
from repositories.listing_repository import ListingRepository
|
||
from database import engine
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson
|
||
|
||
from services import listing_service, export_service, district_service, task_service
|
||
from services import decision_service
|
||
from services.listing_cache import (
|
||
get_cached_count,
|
||
get_cached_features,
|
||
begin_cache_population,
|
||
cache_features_batch_staged,
|
||
finalize_cache_population,
|
||
delete_staging_key,
|
||
is_cache_stale,
|
||
acquire_repopulation_lock,
|
||
)
|
||
from repositories.poi_repository import POIRepository
|
||
from repositories.decision_repository import DecisionRepository
|
||
from repositories.user_repository import UserRepository
|
||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||
from api.metrics import init_metrics, get_metrics_asgi_app
|
||
import api.metrics as app_metrics
|
||
from logging_config import configure_logging
|
||
|
||
|
||
load_dotenv()
|
||
configure_logging("api")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DEFAULT_BATCH_SIZE = 50
|
||
FIRST_BATCH_SIZE = 5
|
||
_rate_limit_config = RateLimitConfig.from_env()
|
||
|
||
|
||
def get_query_parameters(
|
||
listing_type: ListingType,
|
||
min_bedrooms: int = 1,
|
||
max_bedrooms: int = 999,
|
||
min_price: int = 0,
|
||
max_price: int = 10_000_000,
|
||
min_sqm: Optional[int] = None,
|
||
max_sqm: Optional[int] = None,
|
||
min_price_per_sqm: Optional[float] = None,
|
||
max_price_per_sqm: Optional[float] = None,
|
||
last_seen_days: Optional[int] = None,
|
||
let_date_available_from: Optional[datetime] = None,
|
||
furnish_types: Optional[str] = None, # comma-separated list
|
||
district_names: Optional[str] = None, # comma-separated list
|
||
) -> QueryParameters:
|
||
"""Parse query parameters into QueryParameters model."""
|
||
parsed_furnish_types = None
|
||
if furnish_types:
|
||
parsed_furnish_types = [FurnishType(f.strip()) for f in furnish_types.split(",")]
|
||
|
||
parsed_district_names: set[str] = set()
|
||
if district_names:
|
||
parsed_district_names = {d.strip() for d in district_names.split(",") if d.strip()}
|
||
|
||
return QueryParameters(
|
||
listing_type=listing_type,
|
||
min_bedrooms=min_bedrooms,
|
||
max_bedrooms=max_bedrooms,
|
||
min_price=min_price,
|
||
max_price=max_price,
|
||
min_sqm=min_sqm,
|
||
max_sqm=max_sqm,
|
||
min_price_per_sqm=min_price_per_sqm,
|
||
max_price_per_sqm=max_price_per_sqm,
|
||
last_seen_days=last_seen_days,
|
||
let_date_available_from=let_date_available_from,
|
||
furnish_types=parsed_furnish_types,
|
||
district_names=parsed_district_names,
|
||
)
|
||
|
||
|
||
app = FastAPI(
|
||
docs_url=None if APP_ENV == "production" else "/docs",
|
||
redoc_url=None if APP_ENV == "production" else "/redoc",
|
||
openapi_url=None if APP_ENV == "production" else "/openapi.json",
|
||
)
|
||
app.include_router(passkey_router)
|
||
app.include_router(perf_router)
|
||
app.include_router(poi_router)
|
||
app.include_router(decision_router)
|
||
app.include_router(ws_router)
|
||
init_metrics("realestate-crawler-api")
|
||
app.mount("/metrics", get_metrics_asgi_app())
|
||
|
||
|
||
# Allow CORS (for React frontend)
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS],
|
||
allow_methods=["GET", "POST", "PUT", "DELETE"],
|
||
allow_headers=["Authorization", "Content-Type"],
|
||
)
|
||
|
||
app.add_middleware(
|
||
OriginValidatorMiddleware,
|
||
allowed_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS],
|
||
)
|
||
|
||
# Security middleware (added bottom-to-top; last added = outermost)
|
||
# 3. Rate limiting — enforces per-user limits
|
||
app.add_middleware(RateLimitMiddleware, config=_rate_limit_config)
|
||
# 2. Metrics guard — blocks unauthorized /metrics access
|
||
app.add_middleware(MetricsGuardMiddleware, config=_rate_limit_config)
|
||
# 1. Audit logging — logs everything including 429s and 403s
|
||
app.add_middleware(AuditLogMiddleware)
|
||
# 0. Security headers — adds standard security headers to all responses
|
||
app.add_middleware(SecurityHeadersMiddleware)
|
||
|
||
|
||
@app.exception_handler(Exception)
|
||
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
|
||
logger.exception("Unhandled exception")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"detail": "Internal server error"},
|
||
)
|
||
|
||
|
||
@app.get("/api/status")
|
||
async def get_status(response: Response) -> dict[str, str | None]:
|
||
t0 = time.monotonic()
|
||
repository = ListingRepository(engine)
|
||
last_updated = repository.get_last_updated()
|
||
response.headers["Server-Timing"] = f"db_query;dur={(time.monotonic() - t0) * 1000:.1f}"
|
||
return {
|
||
"status": "OK",
|
||
"last_updated": last_updated.isoformat() if last_updated else None,
|
||
}
|
||
|
||
|
||
@app.get("/api/listing")
|
||
async def get_listing(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
response: Response,
|
||
limit: int = 5,
|
||
) -> dict[str, list]:
|
||
"""Get listings from the database."""
|
||
limit = min(limit, _rate_limit_config.listing_limit_cap)
|
||
repository = ListingRepository(engine)
|
||
t0 = time.monotonic()
|
||
result = await listing_service.get_listings(repository, limit=limit)
|
||
response.headers["Server-Timing"] = f"get_listings;dur={(time.monotonic() - t0) * 1000:.1f}"
|
||
logger.info(f"Fetched {result.total_count} listings for {user.email}")
|
||
return {"listings": result.listings}
|
||
|
||
|
||
@app.get("/api/listing_geojson")
|
||
async def get_listing_geojson(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
||
response: Response,
|
||
limit: int | None = None,
|
||
decision_filter: str = "all",
|
||
) -> dict:
|
||
"""Get listings as GeoJSON for map display."""
|
||
timings: list[str] = []
|
||
t0_total = time.monotonic()
|
||
|
||
if limit is not None:
|
||
limit = min(limit, _rate_limit_config.geojson_limit_cap)
|
||
else:
|
||
limit = _rate_limit_config.geojson_limit_cap
|
||
repository = ListingRepository(engine)
|
||
t0 = time.monotonic()
|
||
result = await export_service.export_to_geojson(
|
||
repository,
|
||
query_parameters=query_parameters,
|
||
limit=limit,
|
||
)
|
||
timings.append(f"export_geojson;dur={(time.monotonic() - t0) * 1000:.1f}")
|
||
|
||
# Apply decision filtering
|
||
if decision_filter != "everything":
|
||
t0 = time.monotonic()
|
||
user_id = _get_user_id_safe(user.email)
|
||
if user_id is not None:
|
||
decision_repo = DecisionRepository(engine)
|
||
disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, query_parameters.listing_type.value) if decision_filter == "all" else None
|
||
liked_ids = decision_service.get_liked_ids(decision_repo, user_id, query_parameters.listing_type.value) if decision_filter == "liked" else None
|
||
features = result.data.get("features", [])
|
||
features = [
|
||
f for f in features
|
||
if _should_include(
|
||
f.get("properties", {}).get("id", 0),
|
||
decision_filter, disliked_ids, liked_ids,
|
||
)
|
||
]
|
||
result.data["features"] = features
|
||
timings.append(f"decision_filter;dur={(time.monotonic() - t0) * 1000:.1f}")
|
||
|
||
timings.append(f"total;dur={(time.monotonic() - t0_total) * 1000:.1f}")
|
||
response.headers["Server-Timing"] = ", ".join(timings)
|
||
return result.data
|
||
|
||
|
||
|
||
def _build_poi_distances_lookup(
|
||
user_email: str,
|
||
listing_type: ListingType,
|
||
) -> dict[int, list[dict[str, str | int]]] | None:
|
||
"""Build POI distance lookup for a user, or None if no POIs configured."""
|
||
user_repo = UserRepository(engine)
|
||
db_user = user_repo.get_user_by_email(user_email)
|
||
if not db_user or db_user.id is None:
|
||
return None
|
||
|
||
poi_repo = POIRepository(engine)
|
||
pois = {p.id: p for p in poi_repo.get_pois_for_user(db_user.id)}
|
||
if not pois:
|
||
return None
|
||
|
||
listing_repo = ListingRepository(engine)
|
||
all_ids = list(listing_repo.get_listing_ids(listing_type))
|
||
if not all_ids:
|
||
return None
|
||
|
||
distances = poi_repo.get_distances_for_listings(all_ids, listing_type, db_user.id)
|
||
lookup: dict[int, list[dict[str, str | int]]] = {}
|
||
for d in distances:
|
||
poi_name = pois[d.poi_id].name if d.poi_id in pois else "Unknown"
|
||
lookup.setdefault(d.listing_id, []).append({
|
||
"poi_id": d.poi_id,
|
||
"poi_name": poi_name,
|
||
"travel_mode": d.travel_mode,
|
||
"duration_seconds": d.duration_seconds,
|
||
"distance_meters": d.distance_meters,
|
||
})
|
||
return lookup
|
||
|
||
|
||
def _get_user_id_safe(user_email: str) -> int | None:
|
||
"""Get database user ID by email, or None if user doesn't exist."""
|
||
try:
|
||
user_repo = UserRepository(engine)
|
||
db_user = user_repo.get_user_by_email(user_email)
|
||
if db_user is None or db_user.id is None:
|
||
return None
|
||
return db_user.id
|
||
except Exception:
|
||
logger.debug("Could not look up user ID for %s", user_email)
|
||
return None
|
||
|
||
|
||
def _should_include(
|
||
feature_id: int,
|
||
decision_filter: str,
|
||
disliked_ids: set[int] | None,
|
||
liked_ids: set[int] | None,
|
||
) -> bool:
|
||
"""Determine if a listing should be included based on decision filter."""
|
||
if decision_filter == "everything":
|
||
return True
|
||
if decision_filter == "liked":
|
||
return liked_ids is not None and feature_id in liked_ids
|
||
# default "all": hide disliked
|
||
return disliked_ids is None or feature_id not in disliked_ids
|
||
|
||
|
||
async def _stream_from_cache(
|
||
query_parameters: QueryParameters,
|
||
batch_size: int,
|
||
limit: int | None,
|
||
user_email: str | None = None,
|
||
decision_filter: str = "all",
|
||
stale: bool = False,
|
||
) -> AsyncGenerator[str, None]:
|
||
"""Stream GeoJSON features from the Redis cache (cache-hit path)."""
|
||
cached_count = get_cached_count(query_parameters)
|
||
effective_total = min(limit, cached_count) if limit and cached_count else cached_count
|
||
|
||
yield json.dumps({
|
||
"type": "metadata",
|
||
"batch_size": batch_size,
|
||
"total_expected": effective_total,
|
||
"cached": True,
|
||
"stale": stale,
|
||
}) + "\n"
|
||
|
||
# Resolve decision IDs (deferred to after metadata is sent)
|
||
disliked_ids: set[int] | None = None
|
||
liked_ids: set[int] | None = None
|
||
if decision_filter != "everything" and user_email:
|
||
user_id = _get_user_id_safe(user_email)
|
||
if user_id is not None:
|
||
decision_repo = DecisionRepository(engine)
|
||
listing_type_str = query_parameters.listing_type.value
|
||
if decision_filter == "liked":
|
||
liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str)
|
||
else:
|
||
disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str)
|
||
|
||
count = 0
|
||
is_first_batch = True
|
||
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
|
||
# Apply decision filtering
|
||
if decision_filter != "everything":
|
||
feature_batch = [
|
||
f for f in feature_batch
|
||
if _should_include(
|
||
f.get("properties", {}).get("id", 0),
|
||
decision_filter,
|
||
disliked_ids,
|
||
liked_ids,
|
||
)
|
||
]
|
||
if limit and count + len(feature_batch) > limit:
|
||
feature_batch = feature_batch[:limit - count]
|
||
|
||
# Split first batch into smaller primer batch and remainder
|
||
if is_first_batch and len(feature_batch) > FIRST_BATCH_SIZE:
|
||
# Yield primer batch
|
||
first_features = feature_batch[:FIRST_BATCH_SIZE]
|
||
count += len(first_features)
|
||
yield json.dumps({"type": "batch", "features": first_features}) + "\n"
|
||
|
||
# Yield remainder
|
||
remaining_features = feature_batch[FIRST_BATCH_SIZE:]
|
||
count += len(remaining_features)
|
||
if remaining_features:
|
||
yield json.dumps({"type": "batch", "features": remaining_features}) + "\n"
|
||
is_first_batch = False
|
||
else:
|
||
# Normal batch yielding
|
||
count += len(feature_batch)
|
||
if feature_batch:
|
||
yield json.dumps({"type": "batch", "features": feature_batch}) + "\n"
|
||
is_first_batch = False
|
||
|
||
if limit and count >= limit:
|
||
break
|
||
|
||
yield json.dumps({"type": "complete", "total": count}) + "\n"
|
||
|
||
|
||
async def _stream_from_db(
|
||
query_parameters: QueryParameters,
|
||
batch_size: int,
|
||
limit: int | None,
|
||
poi_distances_lookup: dict[int, list[dict[str, str | int]]] | None = None,
|
||
skip_cache: bool = False,
|
||
user_email: str | None = None,
|
||
decision_filter: str = "all",
|
||
) -> AsyncGenerator[str, None]:
|
||
"""Stream GeoJSON features from the database, populating the cache as we go."""
|
||
repository = ListingRepository(engine)
|
||
|
||
total = repository.count_listings(query_parameters)
|
||
effective_total = min(limit, total) if limit else total
|
||
|
||
yield json.dumps({
|
||
"type": "metadata",
|
||
"batch_size": batch_size,
|
||
"total_expected": effective_total,
|
||
"cached": False,
|
||
}) + "\n"
|
||
|
||
# Resolve decision IDs (deferred to after metadata is sent)
|
||
disliked_ids: set[int] | None = None
|
||
liked_ids: set[int] | None = None
|
||
if decision_filter != "everything" and user_email:
|
||
user_id = _get_user_id_safe(user_email)
|
||
if user_id is not None:
|
||
decision_repo = DecisionRepository(engine)
|
||
listing_type_str = query_parameters.listing_type.value
|
||
if decision_filter == "liked":
|
||
liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str)
|
||
else:
|
||
disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str)
|
||
|
||
staging_key: str | None = None
|
||
if not skip_cache:
|
||
staging_key = begin_cache_population(query_parameters)
|
||
|
||
try:
|
||
count = 0
|
||
batch: list[dict] = []
|
||
current_batch_target = FIRST_BATCH_SIZE # Start with smaller first batch
|
||
for row in repository.stream_listings_optimized(
|
||
query_parameters, limit=limit, page_size=batch_size
|
||
):
|
||
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
|
||
# Inject POI distances if available
|
||
if poi_distances_lookup and row['id'] in poi_distances_lookup:
|
||
feature['properties']['poi_distances'] = poi_distances_lookup[row['id']]
|
||
|
||
# Apply decision filtering
|
||
if not _should_include(row['id'], decision_filter, disliked_ids, liked_ids):
|
||
# Still cache the feature (it's valid data), just don't stream it
|
||
if staging_key:
|
||
cache_features_batch_staged(staging_key, [feature])
|
||
continue
|
||
|
||
batch.append(feature)
|
||
count += 1
|
||
|
||
if len(batch) >= current_batch_target:
|
||
if staging_key:
|
||
cache_features_batch_staged(staging_key, batch)
|
||
yield json.dumps({"type": "batch", "features": batch}) + "\n"
|
||
batch = []
|
||
# After first batch, use normal batch size
|
||
current_batch_target = batch_size
|
||
|
||
if batch:
|
||
if staging_key:
|
||
cache_features_batch_staged(staging_key, batch)
|
||
yield json.dumps({"type": "batch", "features": batch}) + "\n"
|
||
|
||
# Atomically promote staged data to live cache
|
||
if staging_key:
|
||
finalize_cache_population(staging_key, query_parameters)
|
||
staging_key = None # Mark as finalized
|
||
|
||
yield json.dumps({"type": "complete", "total": count}) + "\n"
|
||
finally:
|
||
# Clean up orphaned staging key on failure
|
||
if staging_key:
|
||
delete_staging_key(staging_key)
|
||
|
||
|
||
async def _repopulate_cache_background(query_parameters: QueryParameters) -> None:
|
||
"""Repopulate the cache from DB in the background (fire-and-forget)."""
|
||
if not acquire_repopulation_lock(query_parameters):
|
||
app_metrics.cache_repopulation_total.add(1, {"result": "skipped"})
|
||
logger.debug("Skipping background repopulation — already in progress")
|
||
return
|
||
app_metrics.cache_repopulation_total.add(1, {"result": "started"})
|
||
try:
|
||
logger.info("Starting background cache repopulation for stale entry")
|
||
repository = ListingRepository(engine)
|
||
staging_key = begin_cache_population(query_parameters)
|
||
try:
|
||
for row in repository.stream_listings_optimized(
|
||
query_parameters, limit=None, page_size=DEFAULT_BATCH_SIZE
|
||
):
|
||
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
|
||
cache_features_batch_staged(staging_key, [feature])
|
||
finalize_cache_population(staging_key, query_parameters)
|
||
app_metrics.cache_repopulation_total.add(1, {"result": "completed"})
|
||
logger.info("Background cache repopulation completed")
|
||
except Exception:
|
||
delete_staging_key(staging_key)
|
||
raise
|
||
except Exception:
|
||
app_metrics.cache_repopulation_total.add(1, {"result": "failed"})
|
||
logger.exception("Background cache repopulation failed")
|
||
|
||
|
||
async def _instrumented_stream(
|
||
inner: AsyncGenerator[str, None],
|
||
source: str,
|
||
) -> AsyncGenerator[str, None]:
|
||
"""Wrap a streaming generator to record TTFB, total duration, and feature count."""
|
||
t0 = time.monotonic()
|
||
first_yielded = False
|
||
feature_count = 0
|
||
try:
|
||
async for chunk in inner:
|
||
if not first_yielded:
|
||
app_metrics.stream_time_to_first_byte_seconds.record(
|
||
time.monotonic() - t0, {"source": source}
|
||
)
|
||
first_yielded = True
|
||
# Count features from batch messages
|
||
try:
|
||
msg = json.loads(chunk)
|
||
if msg.get("type") == "batch" and "features" in msg:
|
||
feature_count += len(msg["features"])
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
yield chunk
|
||
finally:
|
||
duration = time.monotonic() - t0
|
||
app_metrics.stream_total_duration_seconds.record(duration, {"source": source})
|
||
app_metrics.stream_features_total.add(feature_count, {"source": source})
|
||
app_metrics.stream_requests_total.add(1, {"source": source})
|
||
|
||
|
||
@app.get("/api/listing_geojson/stream")
|
||
async def stream_listing_geojson(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
||
batch_size: int = DEFAULT_BATCH_SIZE,
|
||
limit: int | None = None,
|
||
include_poi_distances: bool = False,
|
||
decision_filter: str = "all",
|
||
) -> StreamingResponse:
|
||
"""Stream listings as NDJSON for progressive map loading.
|
||
|
||
Returns newline-delimited JSON with three message types:
|
||
- metadata: Initial message with batch_size and total_expected count
|
||
- batch: Array of GeoJSON features
|
||
- complete: Final message with total count
|
||
|
||
Decision filter options:
|
||
- "all" (default): Show all listings except disliked ones
|
||
- "liked": Show only liked listings
|
||
- "everything": Show all listings including disliked
|
||
"""
|
||
batch_size = min(batch_size, _rate_limit_config.geojson_stream_batch_size_cap)
|
||
if limit is not None:
|
||
limit = min(limit, _rate_limit_config.geojson_stream_limit_cap)
|
||
else:
|
||
limit = _rate_limit_config.geojson_stream_limit_cap
|
||
|
||
timings: list[str] = []
|
||
|
||
# Build POI distances lookup if requested
|
||
if include_poi_distances:
|
||
t0 = time.monotonic()
|
||
poi_distances_lookup = _build_poi_distances_lookup(user.email, query_parameters.listing_type)
|
||
timings.append(f"poi_lookup;dur={(time.monotonic() - t0) * 1000:.1f}")
|
||
else:
|
||
poi_distances_lookup = None
|
||
|
||
t0 = time.monotonic()
|
||
cached_count = get_cached_count(query_parameters)
|
||
timings.append(f"cache_check;dur={(time.monotonic() - t0) * 1000:.1f}")
|
||
|
||
if cached_count is not None and cached_count > 0 and not include_poi_distances:
|
||
app_metrics.geojson_cache_operations.add(1, {"result": "hit"})
|
||
t0 = time.monotonic()
|
||
stale = is_cache_stale(query_parameters)
|
||
timings.append(f"stale_check;dur={(time.monotonic() - t0) * 1000:.1f}")
|
||
timings.append('source;desc="cache"')
|
||
if stale:
|
||
app_metrics.cache_stale_serves_total.add(1)
|
||
# Fire-and-forget background repopulation
|
||
asyncio.create_task(_repopulate_cache_background(query_parameters))
|
||
generator = _instrumented_stream(
|
||
_stream_from_cache(
|
||
query_parameters, batch_size, limit,
|
||
user_email=user.email,
|
||
decision_filter=decision_filter,
|
||
stale=stale,
|
||
),
|
||
source="cache",
|
||
)
|
||
else:
|
||
app_metrics.geojson_cache_operations.add(1, {"result": "miss"})
|
||
timings.append('source;desc="db"')
|
||
generator = _instrumented_stream(
|
||
_stream_from_db(
|
||
query_parameters, batch_size, limit, poi_distances_lookup,
|
||
skip_cache=include_poi_distances,
|
||
user_email=user.email,
|
||
decision_filter=decision_filter,
|
||
),
|
||
source="db",
|
||
)
|
||
|
||
return StreamingResponse(
|
||
generator,
|
||
media_type="application/x-ndjson",
|
||
headers={
|
||
"Cache-Control": "no-cache",
|
||
"X-Accel-Buffering": "no", # Disable nginx buffering
|
||
"Server-Timing": ", ".join(timings),
|
||
}
|
||
)
|
||
|
||
|
||
@app.post("/api/refresh_listings")
|
||
async def refresh_listings(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
||
) -> dict[str, str]:
|
||
"""Trigger a background task to refresh listings."""
|
||
# Fire-and-forget the Slack notification so the API response isn't
|
||
# blocked on the webhook round-trip (and so the no-op path when
|
||
# SLACK_WEBHOOK_URL is unset doesn't add latency). send_notification
|
||
# already catches its own exceptions so an orphaned task is harmless.
|
||
asyncio.create_task(
|
||
send_notification(
|
||
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
|
||
)
|
||
)
|
||
|
||
repository = ListingRepository(engine)
|
||
result = await listing_service.refresh_listings(
|
||
repository,
|
||
query_parameters,
|
||
async_mode=True,
|
||
user_email=user.email,
|
||
)
|
||
|
||
# Track task for user
|
||
if result.task_id:
|
||
task_service.add_task_for_user(user.email, result.task_id)
|
||
|
||
return {"task_id": result.task_id or "", "message": result.message}
|
||
|
||
|
||
@app.get("/api/task_status")
|
||
async def get_task_status(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
task_id: str,
|
||
) -> dict[str, str | int | float | None]:
|
||
"""Get the status of a background task."""
|
||
user_tasks = task_service.get_user_tasks(user.email)
|
||
if task_id not in user_tasks:
|
||
raise HTTPException(status_code=404, detail="Task not found")
|
||
status = task_service.get_task_status(task_id)
|
||
return {
|
||
"task_id": status.task_id,
|
||
"status": status.status,
|
||
"result": json.dumps(status.result) if status.result else None,
|
||
"progress": status.progress,
|
||
"processed": status.processed,
|
||
"total": status.total,
|
||
"message": status.message,
|
||
"error": status.error if APP_ENV != "production" else None,
|
||
"traceback": status.traceback if APP_ENV != "production" else None,
|
||
}
|
||
|
||
|
||
@app.get("/api/tasks_for_user")
|
||
async def get_tasks_for_user(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
) -> list[str]:
|
||
"""Get all task IDs for the current user."""
|
||
return task_service.get_user_tasks(user.email)
|
||
|
||
|
||
@app.post("/api/cancel_task")
|
||
async def cancel_task(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
task_id: str = Query(..., description="The task ID to cancel"),
|
||
) -> dict[str, str | bool]:
|
||
"""Cancel a running task and remove it from the user's task list."""
|
||
# Verify user owns this task
|
||
user_tasks = task_service.get_user_tasks(user.email)
|
||
if task_id not in user_tasks:
|
||
raise HTTPException(status_code=404, detail="Task not found or not owned by user")
|
||
|
||
try:
|
||
task_service.cancel_task(task_id, user_email=user.email)
|
||
logger.info(f"Task {task_id} cancelled by {user.email}")
|
||
return {"success": True, "message": "Task cancelled"}
|
||
except Exception as e:
|
||
logger.error(f"Failed to cancel task {task_id}: {e}")
|
||
return {"success": False, "message": str(e)}
|
||
|
||
|
||
@app.post("/api/clear_all_tasks")
|
||
async def clear_all_tasks(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
) -> dict[str, str | int | bool]:
|
||
"""Clear all tasks for the current user."""
|
||
try:
|
||
count = task_service.clear_all_tasks(user.email)
|
||
logger.info(f"Cleared {count} tasks for {user.email}")
|
||
return {"success": True, "count": count, "message": f"Cleared {count} tasks"}
|
||
except Exception as e:
|
||
logger.error(f"Failed to clear tasks for {user.email}: {e}")
|
||
return {"success": False, "count": 0, "message": str(e)}
|
||
|
||
|
||
@app.get("/api/get_districts")
|
||
async def get_districts(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
) -> dict[str, str]:
|
||
"""Get all available districts."""
|
||
return district_service.get_all_districts()
|
||
|
||
|
||
class MarketTrendPoint(BaseModel):
|
||
"""One day of aggregated market stats for the (listing_type, bed-band)."""
|
||
snapshot_date: str
|
||
listing_count: int
|
||
median_total_price: float | None
|
||
median_qmprice: float | None
|
||
mean_total_price: float | None
|
||
mean_qmprice: float | None
|
||
|
||
|
||
@app.get("/api/market_trend", response_model=list[MarketTrendPoint])
|
||
async def get_market_trend(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
listing_type: str = Query("RENT", description="RENT or BUY"),
|
||
min_bedrooms: int = Query(1, ge=0),
|
||
max_bedrooms: int = Query(2, ge=0),
|
||
days: int = Query(30, ge=1, le=365, description="Lookback window in days"),
|
||
) -> list[MarketTrendPoint]:
|
||
"""Daily aggregate snapshots for the requested (type × bed-band) over
|
||
the last N days. Powers the MarketTrendStrip UI."""
|
||
from services.market_aggregator import fetch_trend_series # noqa: PLC0415
|
||
|
||
if listing_type not in {"RENT", "BUY"}:
|
||
raise HTTPException(status_code=400, detail="listing_type must be RENT or BUY")
|
||
if min_bedrooms > max_bedrooms:
|
||
raise HTTPException(status_code=400, detail="min_bedrooms must be <= max_bedrooms")
|
||
|
||
rows = fetch_trend_series(
|
||
engine,
|
||
listing_type=listing_type,
|
||
min_bedrooms=min_bedrooms,
|
||
max_bedrooms=max_bedrooms,
|
||
days=days,
|
||
)
|
||
return [
|
||
MarketTrendPoint(
|
||
snapshot_date=r.snapshot_date.isoformat(),
|
||
listing_count=r.listing_count,
|
||
median_total_price=r.median_total_price,
|
||
median_qmprice=r.median_qmprice,
|
||
mean_total_price=r.mean_total_price,
|
||
mean_qmprice=r.mean_qmprice,
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
class ListingDetailResponse(BaseModel):
|
||
id: int
|
||
price: float
|
||
number_of_bedrooms: int
|
||
square_meters: float | None
|
||
agency: str | None
|
||
council_tax_band: str | None
|
||
url: str
|
||
listing_type: str
|
||
description: str | None
|
||
display_address: str | None
|
||
property_sub_type: str | None
|
||
key_features: list[str]
|
||
photos: list[dict]
|
||
floorplans: list[dict]
|
||
price_history: list[dict]
|
||
furnish_type: str | None
|
||
available_from: str | None
|
||
service_charge: float | None
|
||
lease_left: int | None
|
||
decision: str | None
|
||
poi_distances: list[dict]
|
||
|
||
|
||
@app.get("/api/listing/{listing_id}/detail", response_model=ListingDetailResponse)
|
||
async def get_listing_detail(
|
||
user: Annotated[User, Depends(get_current_user)],
|
||
listing_id: int,
|
||
response: Response,
|
||
listing_type: str = Query(default="RENT"),
|
||
) -> ListingDetailResponse:
|
||
"""Get detailed information for a single listing."""
|
||
timings: list[str] = []
|
||
t0_total = time.monotonic()
|
||
|
||
repository = ListingRepository(engine)
|
||
lt = ListingType(listing_type)
|
||
t_step = time.monotonic()
|
||
listings = await repository.get_listings(
|
||
only_ids=[listing_id], listing_type=lt
|
||
)
|
||
app_metrics.listing_detail_step_duration_seconds.record(
|
||
time.monotonic() - t_step, {"step": "fetch_listing"}
|
||
)
|
||
timings.append(f"fetch_listing;dur={(time.monotonic() - t_step) * 1000:.1f}")
|
||
if not listings:
|
||
raise HTTPException(status_code=404, detail="Listing not found")
|
||
|
||
listing = listings[0]
|
||
t_parse = time.monotonic()
|
||
additional_info = listing.additional_info or {}
|
||
property_info = additional_info.get("property", {})
|
||
|
||
# Extract description
|
||
text_info = property_info.get("text", {})
|
||
description = text_info.get("description") if isinstance(text_info, dict) else None
|
||
|
||
# Extract photos (prefer high-res maxSizeUrl)
|
||
# Rightmove API stores photos under "photos" key, but some code paths used "images"
|
||
photos_raw = property_info.get("images", []) or property_info.get("photos", [])
|
||
photos: list[dict] = []
|
||
if isinstance(photos_raw, list):
|
||
for img in photos_raw:
|
||
if isinstance(img, dict):
|
||
photos.append({
|
||
"url": img.get("maxSizeUrl") or img.get("url", ""),
|
||
"caption": img.get("caption", ""),
|
||
"type": img.get("type", ""),
|
||
})
|
||
|
||
# Extract floorplans
|
||
floorplans_raw = property_info.get("floorplans", [])
|
||
floorplans: list[dict] = []
|
||
if isinstance(floorplans_raw, list):
|
||
for fp in floorplans_raw:
|
||
if isinstance(fp, dict):
|
||
floorplans.append({
|
||
"url": fp.get("url", ""),
|
||
"caption": fp.get("caption", ""),
|
||
})
|
||
|
||
# Extract other fields
|
||
key_features = property_info.get("keyFeatures", [])
|
||
if not isinstance(key_features, list):
|
||
key_features = []
|
||
display_address_info = property_info.get("address", {})
|
||
display_address = (
|
||
display_address_info.get("displayAddress")
|
||
if isinstance(display_address_info, dict)
|
||
else None
|
||
)
|
||
property_sub_type = property_info.get("propertySubType")
|
||
council_tax_band = property_info.get("councilTaxBand") or listing.council_tax_band
|
||
furnish_type_val = property_info.get("letFurnishType")
|
||
available_from_val = property_info.get("letDateAvailable")
|
||
|
||
# Price history
|
||
price_history = [item.to_dict() for item in listing.price_history]
|
||
|
||
# Service charge and lease (for BuyListing)
|
||
service_charge: float | None = None
|
||
lease_left: int | None = None
|
||
if hasattr(listing, "service_charge"):
|
||
service_charge = listing.service_charge # type: ignore[union-attr]
|
||
if hasattr(listing, "lease_left"):
|
||
lease_left = listing.lease_left # type: ignore[union-attr]
|
||
|
||
# Available from (for RentListing)
|
||
if available_from_val is None and hasattr(listing, "available_from"):
|
||
af = listing.available_from # type: ignore[union-attr]
|
||
if af is not None:
|
||
available_from_val = af.isoformat() if hasattr(af, "isoformat") else str(af)
|
||
|
||
# Furnish type (for RentListing)
|
||
if furnish_type_val is None and hasattr(listing, "furnish_type"):
|
||
ft = listing.furnish_type # type: ignore[union-attr]
|
||
if ft is not None:
|
||
furnish_type_val = str(ft)
|
||
|
||
# Load user's decision for this listing
|
||
timings.append(f"parse_detail;dur={(time.monotonic() - t_parse) * 1000:.1f}")
|
||
t_step = time.monotonic()
|
||
decision_val: str | None = None
|
||
user_id = _get_user_id_safe(user.email)
|
||
if user_id is not None:
|
||
decision_repo = DecisionRepository(engine)
|
||
decisions = decision_repo.get_decisions_for_user(user_id)
|
||
for d in decisions:
|
||
if d.listing_id == listing_id and d.listing_type == listing_type:
|
||
decision_val = d.decision
|
||
break
|
||
app_metrics.listing_detail_step_duration_seconds.record(
|
||
time.monotonic() - t_step, {"step": "load_decision"}
|
||
)
|
||
timings.append(f"load_decision;dur={(time.monotonic() - t_step) * 1000:.1f}")
|
||
|
||
# Load POI distances
|
||
t_step = time.monotonic()
|
||
poi_distances_list: list[dict] = []
|
||
if user_id is not None:
|
||
poi_repo = POIRepository(engine)
|
||
pois = {p.id: p for p in poi_repo.get_pois_for_user(user_id)}
|
||
if pois:
|
||
distances = poi_repo.get_distances_for_listings(
|
||
[listing_id], lt, user_id
|
||
)
|
||
for d in distances:
|
||
poi_name = pois[d.poi_id].name if d.poi_id in pois else "Unknown"
|
||
poi_distances_list.append({
|
||
"poi_id": d.poi_id,
|
||
"poi_name": poi_name,
|
||
"travel_mode": d.travel_mode,
|
||
"duration_seconds": d.duration_seconds,
|
||
"distance_meters": d.distance_meters,
|
||
})
|
||
app_metrics.listing_detail_step_duration_seconds.record(
|
||
time.monotonic() - t_step, {"step": "load_poi_distances"}
|
||
)
|
||
timings.append(f"load_poi_distances;dur={(time.monotonic() - t_step) * 1000:.1f}")
|
||
timings.append(f"total;dur={(time.monotonic() - t0_total) * 1000:.1f}")
|
||
response.headers["Server-Timing"] = ", ".join(timings)
|
||
|
||
return ListingDetailResponse(
|
||
id=listing.id,
|
||
price=listing.price,
|
||
number_of_bedrooms=listing.number_of_bedrooms,
|
||
square_meters=listing.square_meters,
|
||
agency=listing.agency,
|
||
council_tax_band=council_tax_band,
|
||
url=listing.url,
|
||
listing_type=listing_type,
|
||
description=description,
|
||
display_address=display_address,
|
||
property_sub_type=property_sub_type,
|
||
key_features=key_features,
|
||
photos=photos,
|
||
floorplans=floorplans,
|
||
price_history=price_history,
|
||
furnish_type=furnish_type_val,
|
||
available_from=available_from_val,
|
||
service_charge=service_charge,
|
||
lease_left=lease_left,
|
||
decision=decision_val,
|
||
poi_distances=poi_distances_list,
|
||
)
|
||
|
||
|
||
FastAPIInstrumentor.instrument_app(app)
|