{EM_DASH};
+
+ const formatted = (() => {
+ if (kind === 'price') return formatPrice(delta.current);
+ if (kind === 'pricePerSqm') return `£${Math.round(delta.current)}/m²`;
+ return delta.current.toLocaleString();
+ })();
+
+ if (delta.changePct === null || delta.previous === null) {
+ return {formatted};
+ }
+
+ // Drops in price/£m² are good (green); rises are red. For listing count
+ // direction has no inherent good/bad — just show the move neutrally.
+ const isPriceMetric = kind === 'price' || kind === 'pricePerSqm';
+ const dropped = delta.changePct < 0;
+ const colour = isPriceMetric
+ ? (Math.abs(delta.changePct) < 0.1
+ ? 'text-muted-foreground'
+ : dropped
+ ? 'text-[var(--deal-good)]'
+ : 'text-[var(--deal-above)]')
+ : 'text-muted-foreground';
+ const arrow = Math.abs(delta.changePct) < 0.1 ? '·' : dropped ? '↓' : '↑';
+ const sign = delta.changePct > 0 ? '+' : '';
+ return (
+
+ {formatted}{' '}
+
+ {arrow} {sign}{delta.changePct}%
+
+
+ );
+}
+
+export function MarketTrendStrip({
+ user,
+ listingType,
+ minBedrooms,
+ maxBedrooms,
+ days = 30,
+}: Props) {
+ const { series, isLoading, deltas } = useMarketTrend(
+ user,
+ listingType,
+ minBedrooms,
+ maxBedrooms,
+ days,
+ );
+
+ if (isLoading && series.length === 0) {
+ return (
+
+ Loading market trend…
+
+ );
+ }
+ // Hide entirely when we have no data at all (the daily aggregator hasn't
+ // produced any rows yet — common during the first week post-launch).
+ if (series.length === 0) return null;
+
+ const label = `${listingType === 'RENT' ? 'Rent' : 'Buy'} · ${minBedrooms}-${maxBedrooms} bed · ${days}d`;
+
+ return (
+
+ {label}:
+ Median
+ ·
+ £/m²
+ ·
+ Listings
+
+ );
+}
diff --git a/frontend/src/components/PropertyCard.tsx b/frontend/src/components/PropertyCard.tsx
index dd1b31b..9702124 100644
--- a/frontend/src/components/PropertyCard.tsx
+++ b/frontend/src/components/PropertyCard.tsx
@@ -228,6 +228,28 @@ export function PropertyCard({
? { dotColor: 'bg-[var(--deal-above)]', label: 'Above avg' }
: null;
+ // Per-listing trend badge — surfaces when the daily aggregator has
+ // computed a non-trivial price move over the 14d lookback window.
+ // Threshold = 1% so we don't render noise from rounding or minor jitters.
+ const trendBadge = (() => {
+ const pct = property.price_change_pct_14d;
+ const past = property.price_14d_ago;
+ if (typeof pct !== 'number' || !isFiniteNumber(pct)) return null;
+ if (Math.abs(pct) < 1) return null;
+ const dropped = pct < 0;
+ const deltaAbs = isFiniteNumber(past) && isFiniteNumber(property.total_price)
+ ? Math.abs(property.total_price - past)
+ : null;
+ return {
+ dropped,
+ label: `${dropped ? '↓' : '↑'} ${deltaAbs !== null ? formatPrice(deltaAbs) : ''} (${pct > 0 ? '+' : ''}${pct}%) in 14d`.replace(/\s+/g, ' ').trim(),
+ // Drops are good for the buyer; greens for drop, reds for rise.
+ className: dropped
+ ? 'text-[var(--deal-good)] bg-[var(--deal-good)]/10 border-[var(--deal-good)]/40'
+ : 'text-[var(--deal-above)] bg-[var(--deal-above)]/10 border-[var(--deal-above)]/40',
+ };
+ })();
+
const handleClick = () => {
onClick?.();
};
@@ -266,6 +288,14 @@ export function PropertyCard({
{priceIndicator && (
{priceIndicator.label}
)}
+ {trendBadge && (
+
+ {trendBadge.label}
+
+ )}
{/* Key metrics on one line */}
@@ -344,6 +374,14 @@ export function PropertyCard({
{priceIndicator && (
{priceIndicator.label}
)}
+ {trendBadge && (
+
+ {trendBadge.label}
+
+ )}
{/* Key metrics on one line */}
diff --git a/frontend/src/components/__tests__/PropertyCard.test.tsx b/frontend/src/components/__tests__/PropertyCard.test.tsx
index 4d7bcc5..c35da81 100644
--- a/frontend/src/components/__tests__/PropertyCard.test.tsx
+++ b/frontend/src/components/__tests__/PropertyCard.test.tsx
@@ -223,4 +223,51 @@ describe('PropertyCard', () => {
expect(container.querySelector('button[aria-label="Previous photo"]')).not.toBeInTheDocument();
expect(container.querySelector('button[aria-label="Next photo"]')).not.toBeInTheDocument();
});
+
+ // Price-trend badge — renders for moves >=1%, hidden for noise / nulls.
+ it('renders a "↓" trend badge when price dropped >1% in 14d', () => {
+ const property = {
+ ...createMockProperty({ total_price: 2400 }),
+ price_14d_ago: 2500,
+ price_change_pct_14d: -4,
+ } as unknown as PropertyProperties;
+ const { container } = render();
+ const text = container.textContent ?? '';
+ expect(text).toMatch(/↓/);
+ expect(text).toMatch(/-4%/);
+ expect(text).toMatch(/14d/);
+ });
+
+ it('renders a "↑" trend badge when price rose >1% in 14d', () => {
+ const property = {
+ ...createMockProperty({ total_price: 2200 }),
+ price_14d_ago: 2000,
+ price_change_pct_14d: 10,
+ } as unknown as PropertyProperties;
+ const { container } = render();
+ const text = container.textContent ?? '';
+ expect(text).toMatch(/↑/);
+ expect(text).toMatch(/\+10%/);
+ });
+
+ it('omits the trend badge when the move is < 1% (noise threshold)', () => {
+ const property = {
+ ...createMockProperty({ total_price: 2510 }),
+ price_14d_ago: 2500,
+ price_change_pct_14d: 0.4,
+ } as unknown as PropertyProperties;
+ const { container } = render();
+ expect(container.textContent).not.toMatch(/↑|↓/);
+ expect(container.textContent).not.toMatch(/14d/);
+ });
+
+ it('omits the trend badge when price_change_pct_14d is null', () => {
+ const property = {
+ ...createMockProperty({ total_price: 2500 }),
+ price_14d_ago: null,
+ price_change_pct_14d: null,
+ } as unknown as PropertyProperties;
+ const { container } = render();
+ expect(container.textContent).not.toMatch(/14d/);
+ });
});
diff --git a/frontend/src/hooks/__tests__/useMarketTrend.test.tsx b/frontend/src/hooks/__tests__/useMarketTrend.test.tsx
new file mode 100644
index 0000000..b605b3d
--- /dev/null
+++ b/frontend/src/hooks/__tests__/useMarketTrend.test.tsx
@@ -0,0 +1,94 @@
+import { describe, it, expect, vi, beforeEach } from 'vitest';
+import { renderHook, waitFor } from '@testing-library/react';
+import { useMarketTrend } from '@/hooks/useMarketTrend';
+import type { AuthUser } from '@/auth/types';
+
+vi.mock('@/services/apiClient', () => ({
+ apiRequest: vi.fn(),
+}));
+import { apiRequest } from '@/services/apiClient';
+
+const apiRequestMock = vi.mocked(apiRequest);
+
+const user: AuthUser = {
+ sub: 'u',
+ email: 'a@b.com',
+ name: 'A',
+ accessToken: 'tok',
+ provider: 'oidc',
+};
+
+describe('useMarketTrend — series fetch + delta derivation', () => {
+ beforeEach(() => {
+ apiRequestMock.mockReset();
+ });
+
+ it('hits /api/market_trend with the requested filter params', async () => {
+ apiRequestMock.mockResolvedValue([]);
+ const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30));
+ await waitFor(() => expect(result.current.isLoading).toBe(false));
+ expect(apiRequestMock).toHaveBeenCalledWith(
+ user,
+ expect.stringContaining('/api/market_trend?'),
+ );
+ const url = apiRequestMock.mock.calls[0][1] as string;
+ expect(url).toContain('listing_type=RENT');
+ expect(url).toContain('min_bedrooms=1');
+ expect(url).toContain('max_bedrooms=2');
+ expect(url).toContain('days=30');
+ });
+
+ it('computes deltas between the oldest and newest in-window points', async () => {
+ apiRequestMock.mockResolvedValue([
+ {
+ snapshot_date: '2026-04-16',
+ listing_count: 1000,
+ median_total_price: 2500,
+ median_qmprice: 50,
+ mean_total_price: 2600,
+ mean_qmprice: 52,
+ },
+ {
+ snapshot_date: '2026-05-16',
+ listing_count: 1050,
+ median_total_price: 2400,
+ median_qmprice: 48,
+ mean_total_price: 2500,
+ mean_qmprice: 50,
+ },
+ ]);
+ const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30));
+ await waitFor(() => expect(result.current.series.length).toBe(2));
+ expect(result.current.deltas.median_total_price.current).toBe(2400);
+ expect(result.current.deltas.median_total_price.previous).toBe(2500);
+ // (2400 - 2500) / 2500 * 100 = -4
+ expect(result.current.deltas.median_total_price.changePct).toBe(-4);
+ expect(result.current.deltas.listing_count.changePct).toBe(5);
+ });
+
+ it('returns null changePct when there is only one datapoint', async () => {
+ apiRequestMock.mockResolvedValue([
+ {
+ snapshot_date: '2026-05-16',
+ listing_count: 1050,
+ median_total_price: 2400,
+ median_qmprice: 48,
+ mean_total_price: 2500,
+ mean_qmprice: 50,
+ },
+ ]);
+ const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30));
+ await waitFor(() => expect(result.current.series.length).toBe(1));
+ expect(result.current.deltas.median_total_price.changePct).toBeNull();
+ expect(result.current.deltas.median_total_price.current).toBe(2400);
+ });
+
+ it('returns empty series + null deltas when the endpoint is empty', async () => {
+ apiRequestMock.mockResolvedValue([]);
+ const { result } = renderHook(() => useMarketTrend(user, 'BUY', 1, 2, 30));
+ await waitFor(() => expect(result.current.isLoading).toBe(false));
+ expect(result.current.series).toEqual([]);
+ expect(result.current.deltas.median_total_price.current).toBeNull();
+ expect(result.current.deltas.median_total_price.changePct).toBeNull();
+ });
+});
diff --git a/frontend/src/hooks/useMarketTrend.ts b/frontend/src/hooks/useMarketTrend.ts
new file mode 100644
index 0000000..f3d0f2d
--- /dev/null
+++ b/frontend/src/hooks/useMarketTrend.ts
@@ -0,0 +1,93 @@
+// Fetches the daily market aggregate series for a given listing-type +
+// bedroom band. Re-fetches when the inputs change. Returns the raw array
+// of points plus a derived "now vs N days ago" delta the strip renders.
+
+import { useEffect, useState } from 'react';
+import type { AuthUser } from '@/auth/types';
+import type { MarketTrendPoint } from '@/types';
+import { apiRequest } from '@/services/apiClient';
+
+export interface MarketTrendDelta {
+ metric: 'median_total_price' | 'median_qmprice' | 'listing_count';
+ current: number | null;
+ previous: number | null;
+ changePct: number | null;
+}
+
+export interface UseMarketTrendResult {
+ series: MarketTrendPoint[];
+ isLoading: boolean;
+ error: string | null;
+ // Convenience: today's value vs the oldest in-window value.
+ deltas: Record;
+}
+
+function buildDelta(
+ metric: MarketTrendDelta['metric'],
+ series: MarketTrendPoint[],
+): MarketTrendDelta {
+ if (series.length < 2) {
+ const only = series[0];
+ return {
+ metric,
+ current: only ? (only[metric] as number | null) : null,
+ previous: null,
+ changePct: null,
+ };
+ }
+ const current = series[series.length - 1][metric] as number | null;
+ const previous = series[0][metric] as number | null;
+ if (current === null || previous === null || previous === 0) {
+ return { metric, current, previous, changePct: null };
+ }
+ const changePct = Math.round(((current - previous) / previous) * 1000) / 10;
+ return { metric, current, previous, changePct };
+}
+
+export function useMarketTrend(
+ user: AuthUser | null,
+ listingType: 'RENT' | 'BUY',
+ minBedrooms: number,
+ maxBedrooms: number,
+ days: number = 30,
+): UseMarketTrendResult {
+ const [series, setSeries] = useState([]);
+ const [isLoading, setIsLoading] = useState(false);
+ const [error, setError] = useState(null);
+
+ useEffect(() => {
+ if (!user) return;
+ let cancelled = false;
+ setIsLoading(true);
+ setError(null);
+ const params = new URLSearchParams({
+ listing_type: listingType,
+ min_bedrooms: String(minBedrooms),
+ max_bedrooms: String(maxBedrooms),
+ days: String(days),
+ });
+ apiRequest(user, `/api/market_trend?${params}`)
+ .then((data) => {
+ if (cancelled) return;
+ setSeries(data);
+ })
+ .catch((err: Error) => {
+ if (cancelled) return;
+ setError(err.message);
+ })
+ .finally(() => {
+ if (!cancelled) setIsLoading(false);
+ });
+ return () => {
+ cancelled = true;
+ };
+ }, [user, listingType, minBedrooms, maxBedrooms, days]);
+
+ const deltas: UseMarketTrendResult['deltas'] = {
+ median_total_price: buildDelta('median_total_price', series),
+ median_qmprice: buildDelta('median_qmprice', series),
+ listing_count: buildDelta('listing_count', series),
+ };
+
+ return { series, isLoading, error, deltas };
+}
diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts
index aabb2b2..10c2374 100644
--- a/frontend/src/types/index.ts
+++ b/frontend/src/types/index.ts
@@ -24,6 +24,10 @@ export interface PropertyProperties {
price_history: PropertyPriceHistory[];
listing_type?: 'RENT' | 'BUY';
poi_distances?: POIDistanceInfo[];
+ // Trend snapshot maintained by the daily market aggregator (nullable
+ // until the aggregator has run at least once with this listing in scope).
+ price_14d_ago?: number | null;
+ price_change_pct_14d?: number | null;
}
export interface PropertyFeature {
@@ -181,6 +185,16 @@ export interface WSPongMessage {
export type WSMessage = WSInitMessage | WSTaskUpdateMessage | WSPongMessage;
+// One day of aggregated market stats — see /api/market_trend.
+export interface MarketTrendPoint {
+ snapshot_date: string; // ISO date
+ listing_count: number;
+ median_total_price: number | null;
+ median_qmprice: number | null;
+ mean_total_price: number | null;
+ mean_qmprice: number | null;
+}
+
// Decision types
// `seen` is a soft-hide: the listing is removed from the main list by default
// but can be re-revealed via the "Show hidden" filter toggle. Distinct from
diff --git a/models/listing.py b/models/listing.py
index 9d969ec..83715f7 100644
--- a/models/listing.py
+++ b/models/listing.py
@@ -7,6 +7,7 @@ import json
from typing import Any, Dict, List
from pydantic import BaseModel, Field as PydanticField, model_validator
from rec import routing
+from sqlalchemy import UniqueConstraint
from sqlmodel import JSON, TEXT, SQLModel, Field
@@ -92,6 +93,14 @@ class Listing(SQLModel, table=False):
sa_type=TEXT, nullable=True, default=None
) # Store as JSON string for simplicity
+ # Per-listing price-trend snapshot maintained by the daily aggregator.
+ # `price_14d_ago` is the historical price ~14 days before the most recent
+ # aggregator run (sourced from price_history_json). `price_change_pct_14d`
+ # is the % change from that to the current `price` (positive=up, neg=down).
+ # Both are null when the listing has no entry that old in its history.
+ price_14d_ago: float | None = Field(default=None, nullable=True)
+ price_change_pct_14d: float | None = Field(default=None, nullable=True)
+
@property
def is_removed(self) -> bool:
if not self.additional_info:
@@ -176,6 +185,34 @@ class BuyListing(Listing, table=True):
) # in years, e.g., 90, 80, etc.
+class DailyListingAggregate(SQLModel, table=True):
+ """One row per (snapshot_date, listing_type, bedroom band).
+
+ Written daily by `compute_daily_market_aggregates_task` after the scrape
+ settles. Drives the `MarketTrendStrip` UI ("get a vibe of the market").
+ The (date, listing_type, min_bedrooms, max_bedrooms) tuple is unique;
+ the aggregator upserts rather than appends so re-running on the same day
+ refreshes the snapshot instead of duplicating it.
+ """
+ __table_args__ = (
+ UniqueConstraint(
+ "snapshot_date", "listing_type", "min_bedrooms", "max_bedrooms",
+ name="uq_aggregate_date_filter",
+ ),
+ )
+
+ id: int | None = Field(default=None, primary_key=True)
+ snapshot_date: datetime = Field(nullable=False, index=True)
+ listing_type: str = Field(nullable=False) # "RENT" or "BUY"
+ min_bedrooms: int = Field(nullable=False)
+ max_bedrooms: int = Field(nullable=False)
+ listing_count: int = Field(nullable=False)
+ median_total_price: float | None = Field(default=None, nullable=True)
+ median_qmprice: float | None = Field(default=None, nullable=True)
+ mean_total_price: float | None = Field(default=None, nullable=True)
+ mean_qmprice: float | None = Field(default=None, nullable=True)
+
+
@dataclass(frozen=True)
class DestinationMode:
destination_address: str
diff --git a/services/market_aggregator.py b/services/market_aggregator.py
new file mode 100644
index 0000000..7ce511a
--- /dev/null
+++ b/services/market_aggregator.py
@@ -0,0 +1,287 @@
+"""Daily market-trend aggregator.
+
+Two outputs per run:
+
+1. Per-listing trend columns. For each row in RentListing / BuyListing we
+ parse `price_history_json` and find the price entry whose `last_seen` was
+ closest to `lookback_days` ago. The current price and that historical
+ price land on `price_14d_ago` / `price_change_pct_14d` for the
+ PropertyCard badge to render.
+
+2. Aggregate market snapshot. For each configured (listing_type, bedroom
+ band) we compute median/mean/count over the CURRENT listing pool and
+ upsert one row in `dailylistingaggregate` keyed on today's date. The
+ `MarketTrendStrip` UI consumes these rows.
+
+Both steps are idempotent — re-running on the same day refreshes the
+snapshot rather than appending. Designed to fire daily ~04:00 UTC (1h
+after the 03:00 RENT scrape so the data is fresh).
+"""
+from __future__ import annotations
+
+import json
+import logging
+import time
+from datetime import datetime, timedelta
+from statistics import mean, median
+from typing import Iterable
+
+from sqlalchemy import Engine
+from sqlmodel import Session, select
+
+from models.listing import (
+ BuyListing,
+ DailyListingAggregate,
+ PriceHistoryItem,
+ RentListing,
+)
+
+logger = logging.getLogger("uvicorn")
+
+# Default scope: the user's daily filter (1-2 bed, both listing types).
+DEFAULT_BEDROOM_BANDS: tuple[tuple[int, int], ...] = ((1, 2),)
+DEFAULT_LISTING_TYPES: tuple[str, ...] = ("RENT", "BUY")
+
+# Trend lookback window for the per-listing badge. Surfaces price moves
+# that happened in the last fortnight (long enough for prices to actually
+# settle, short enough to feel current).
+DEFAULT_LOOKBACK_DAYS = 14
+
+
+def _parse_history(price_history_json: str | None) -> list[PriceHistoryItem]:
+ if not price_history_json:
+ return []
+ try:
+ raw = json.loads(price_history_json)
+ except (ValueError, TypeError):
+ return []
+ out: list[PriceHistoryItem] = []
+ for item in raw:
+ try:
+ out.append(
+ PriceHistoryItem(
+ first_seen=datetime.fromisoformat(item["first_seen"]),
+ last_seen=datetime.fromisoformat(item["last_seen"]),
+ price=float(item["price"]),
+ )
+ )
+ except (KeyError, ValueError, TypeError):
+ continue
+ return out
+
+
+def _price_at_or_before(
+ history: list[PriceHistoryItem], cutoff: datetime
+) -> float | None:
+ """Return the price of the entry whose `last_seen` is closest to (but
+ not after) `cutoff`. Returns None if no entry that old exists.
+
+ History is in chronological order; we scan and keep the latest match.
+ """
+ found: float | None = None
+ for item in history:
+ if item.last_seen <= cutoff:
+ found = item.price
+ else:
+ break
+ return found
+
+
+def compute_trend_for_listing(
+ price_history_json: str | None,
+ current_price: float | None,
+ *,
+ lookback_days: int = DEFAULT_LOOKBACK_DAYS,
+ now: datetime | None = None,
+) -> tuple[float | None, float | None]:
+ """Return `(price_n_days_ago, change_pct)` for one listing.
+
+ `change_pct` is `(current - past) / past * 100` rounded to 2dp; positive
+ = price went up, negative = down. Both are None when there's no entry
+ that old in history or current price is unusable.
+ """
+ if not isinstance(current_price, (int, float)) or current_price <= 0:
+ return None, None
+ cutoff = (now or datetime.utcnow()) - timedelta(days=lookback_days)
+ history = _parse_history(price_history_json)
+ past = _price_at_or_before(history, cutoff)
+ if past is None or past <= 0:
+ return None, None
+ pct = round((current_price - past) / past * 100.0, 2)
+ return past, pct
+
+
+def update_per_listing_trend(
+ engine: Engine,
+ *,
+ lookback_days: int = DEFAULT_LOOKBACK_DAYS,
+ batch_size: int = 1000,
+ now: datetime | None = None,
+) -> dict[str, int]:
+ """Walk every RentListing + BuyListing, recompute trend columns, write."""
+ counts = {"rent_updated": 0, "buy_updated": 0}
+ t0 = time.monotonic()
+ for model_name, model in (("rent", RentListing), ("buy", BuyListing)):
+ with Session(engine) as session:
+ offset = 0
+ while True:
+ stmt = select(model).offset(offset).limit(batch_size)
+ rows: list = list(session.exec(stmt).all())
+ if not rows:
+ break
+ for row in rows:
+ past, pct = compute_trend_for_listing(
+ row.price_history_json,
+ row.price,
+ lookback_days=lookback_days,
+ now=now,
+ )
+ if row.price_14d_ago != past or row.price_change_pct_14d != pct:
+ row.price_14d_ago = past
+ row.price_change_pct_14d = pct
+ session.add(row)
+ counts[f"{model_name}_updated"] += 1
+ session.commit()
+ if len(rows) < batch_size:
+ break
+ offset += batch_size
+ logger.info(
+ "Per-listing trend updated in %.1fs: rent=%d buy=%d (lookback=%dd)",
+ time.monotonic() - t0,
+ counts["rent_updated"],
+ counts["buy_updated"],
+ lookback_days,
+ )
+ return counts
+
+
+def _stats(values: Iterable[float]) -> dict[str, float | None]:
+ """Median + mean over the valid positive entries; null for empty input."""
+ finite = [v for v in values if isinstance(v, (int, float)) and v > 0]
+ if not finite:
+ return {"median": None, "mean": None, "count": 0}
+ return {
+ "median": float(median(finite)),
+ "mean": round(float(mean(finite)), 2),
+ "count": len(finite),
+ }
+
+
+def compute_aggregate_snapshot(
+ engine: Engine,
+ *,
+ listing_types: tuple[str, ...] = DEFAULT_LISTING_TYPES,
+ bedroom_bands: tuple[tuple[int, int], ...] = DEFAULT_BEDROOM_BANDS,
+ snapshot_date: datetime | None = None,
+) -> list[DailyListingAggregate]:
+ """Compute one aggregate row per (listing_type * bedroom band) and
+ upsert it onto today's `snapshot_date`. Returns the persisted rows.
+
+ Uses an `INSERT ... ON DUPLICATE KEY UPDATE` so re-running on the same
+ day refreshes the row in place — no duplicates, no DELETE.
+ """
+ today = snapshot_date or datetime.utcnow().replace(
+ hour=0, minute=0, second=0, microsecond=0
+ )
+ written: list[DailyListingAggregate] = []
+ dialect = engine.dialect.name
+ with Session(engine) as session:
+ for listing_type in listing_types:
+ model = RentListing if listing_type == "RENT" else BuyListing
+ for min_bed, max_bed in bedroom_bands:
+ stmt = select(model.price, model.square_meters).where(
+ model.number_of_bedrooms >= min_bed,
+ model.number_of_bedrooms <= max_bed,
+ )
+ rows = list(session.exec(stmt).all())
+ prices = [r[0] for r in rows]
+ qmprices = [
+ (r[0] / r[1])
+ for r in rows
+ if r[1] is not None and r[1] > 0
+ ]
+ price_stats = _stats(prices)
+ qm_stats = _stats(qmprices)
+ values = {
+ "snapshot_date": today,
+ "listing_type": listing_type,
+ "min_bedrooms": min_bed,
+ "max_bedrooms": max_bed,
+ "listing_count": price_stats["count"],
+ "median_total_price": price_stats["median"],
+ "median_qmprice": qm_stats["median"],
+ "mean_total_price": price_stats["mean"],
+ "mean_qmprice": qm_stats["mean"],
+ }
+ if dialect == "mysql":
+ from sqlalchemy.dialects.mysql import insert as mysql_insert
+ stmt_ins = mysql_insert(DailyListingAggregate).values(**values)
+ stmt_ins = stmt_ins.on_duplicate_key_update(
+ listing_count=stmt_ins.inserted.listing_count,
+ median_total_price=stmt_ins.inserted.median_total_price,
+ median_qmprice=stmt_ins.inserted.median_qmprice,
+ mean_total_price=stmt_ins.inserted.mean_total_price,
+ mean_qmprice=stmt_ins.inserted.mean_qmprice,
+ )
+ session.execute(stmt_ins)
+ else:
+ from sqlalchemy.dialects.sqlite import insert as sqlite_insert
+ stmt_ins = sqlite_insert(DailyListingAggregate).values(**values)
+ stmt_ins = stmt_ins.on_conflict_do_update(
+ index_elements=[
+ "snapshot_date", "listing_type",
+ "min_bedrooms", "max_bedrooms",
+ ],
+ set_={
+ "listing_count": stmt_ins.excluded.listing_count,
+ "median_total_price": stmt_ins.excluded.median_total_price,
+ "median_qmprice": stmt_ins.excluded.median_qmprice,
+ "mean_total_price": stmt_ins.excluded.mean_total_price,
+ "mean_qmprice": stmt_ins.excluded.mean_qmprice,
+ },
+ )
+ session.execute(stmt_ins)
+ session.commit()
+ row = session.exec(
+ select(DailyListingAggregate).where(
+ DailyListingAggregate.snapshot_date == today,
+ DailyListingAggregate.listing_type == listing_type,
+ DailyListingAggregate.min_bedrooms == min_bed,
+ DailyListingAggregate.max_bedrooms == max_bed,
+ )
+ ).first()
+ if row is not None:
+ written.append(row)
+ logger.info(
+ "Aggregate %s %d-%d on %s: count=%s median=%s/%s mean=%s/%s",
+ listing_type, min_bed, max_bed, today.date(),
+ price_stats["count"],
+ price_stats["median"], qm_stats["median"],
+ price_stats["mean"], qm_stats["mean"],
+ )
+ return written
+
+
+def fetch_trend_series(
+ engine: Engine,
+ *,
+ listing_type: str,
+ min_bedrooms: int,
+ max_bedrooms: int,
+ days: int = 30,
+) -> list[DailyListingAggregate]:
+ """Return the aggregate rows for the last `days` days, ordered ascending
+ by date. Empty list when no rows match — the strip handles that case."""
+ cutoff = datetime.utcnow() - timedelta(days=days)
+ with Session(engine) as session:
+ stmt = (
+ select(DailyListingAggregate)
+ .where(
+ DailyListingAggregate.listing_type == listing_type,
+ DailyListingAggregate.min_bedrooms == min_bedrooms,
+ DailyListingAggregate.max_bedrooms == max_bedrooms,
+ DailyListingAggregate.snapshot_date >= cutoff,
+ )
+ .order_by(DailyListingAggregate.snapshot_date)
+ )
+ return list(session.exec(stmt).all())
diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py
index 808823c..51a70c0 100644
--- a/tasks/listing_tasks.py
+++ b/tasks/listing_tasks.py
@@ -656,3 +656,15 @@ def setup_periodic_tasks(sender, **kwargs):
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
name=schedule.name,
)
+
+ # Daily market aggregator — fires after the 03:00 RENT scrape so the
+ # snapshot reflects today's freshly-pulled data. Imported lazily to
+ # avoid a circular import (market_tasks imports celery_app, which
+ # imports listing_tasks via the include list).
+ from tasks.market_tasks import compute_daily_market_aggregates_task
+ celery_logger.info("Registering periodic task: daily-market-aggregator at 4:00")
+ sender.add_periodic_task(
+ crontab(minute="0", hour="4", day_of_week="*"),
+ compute_daily_market_aggregates_task.s(),
+ name="daily-market-aggregator",
+ )
diff --git a/tasks/market_tasks.py b/tasks/market_tasks.py
new file mode 100644
index 0000000..e57fd79
--- /dev/null
+++ b/tasks/market_tasks.py
@@ -0,0 +1,57 @@
+"""Daily market-trend aggregator Celery task.
+
+Fires daily at 04:00 UTC — one hour after the 03:00 RENT scrape so the
+data is fresh. Calls into `services.market_aggregator` to:
+ 1. Recompute per-listing `price_14d_ago` / `price_change_pct_14d`.
+ 2. Upsert the per-(listing_type, bedroom-band) row in
+ `dailylistingaggregate` for today's snapshot.
+
+Idempotent: re-running on the same day refreshes both surfaces in place.
+"""
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from celery_app import app
+from database import engine
+from services import market_aggregator
+
+celery_logger = logging.getLogger("celery_app")
+
+
+@app.task(
+ bind=True,
+ name="tasks.market_tasks.compute_daily_market_aggregates_task",
+ time_limit=3600,
+ soft_time_limit=3500,
+ acks_late=True,
+)
+def compute_daily_market_aggregates_task(self: Any) -> dict[str, Any]:
+ """Run both stages of the daily market aggregator."""
+ celery_logger.info("Starting daily market aggregator (task=%s)", self.request.id)
+ per_listing = market_aggregator.update_per_listing_trend(engine)
+ aggregates = market_aggregator.compute_aggregate_snapshot(engine)
+ result = {
+ "status": "ok",
+ "per_listing": per_listing,
+ "aggregates": [
+ {
+ "snapshot_date": a.snapshot_date.isoformat(),
+ "listing_type": a.listing_type,
+ "min_bedrooms": a.min_bedrooms,
+ "max_bedrooms": a.max_bedrooms,
+ "listing_count": a.listing_count,
+ "median_total_price": a.median_total_price,
+ "median_qmprice": a.median_qmprice,
+ }
+ for a in aggregates
+ ],
+ }
+ celery_logger.info(
+ "Daily market aggregator complete: rent_updated=%s buy_updated=%s aggregates=%s",
+ per_listing.get("rent_updated"),
+ per_listing.get("buy_updated"),
+ len(aggregates),
+ )
+ return result
diff --git a/tests/unit/test_market_aggregator.py b/tests/unit/test_market_aggregator.py
new file mode 100644
index 0000000..176f466
--- /dev/null
+++ b/tests/unit/test_market_aggregator.py
@@ -0,0 +1,163 @@
+"""Unit tests for the daily market aggregator."""
+from __future__ import annotations
+
+import json
+from datetime import datetime, timedelta
+
+import pytest
+from sqlalchemy import create_engine
+from sqlmodel import SQLModel, Session
+
+from models.listing import (
+ DailyListingAggregate,
+ FurnishType,
+ Listing,
+ ListingSite,
+ RentListing,
+)
+from services import market_aggregator
+
+
+# --- compute_trend_for_listing --------------------------------------------
+
+def _hist(entries: list[tuple[datetime, float]]) -> str:
+ return json.dumps(
+ [
+ {
+ "first_seen": fs.isoformat(),
+ "last_seen": fs.isoformat(),
+ "price": p,
+ }
+ for fs, p in entries
+ ]
+ )
+
+
+class TestComputeTrendForListing:
+ def test_null_history_returns_nones(self) -> None:
+ past, pct = market_aggregator.compute_trend_for_listing(None, 2500)
+ assert past is None and pct is None
+
+ def test_empty_history_returns_nones(self) -> None:
+ past, pct = market_aggregator.compute_trend_for_listing("[]", 2500)
+ assert past is None and pct is None
+
+ def test_malformed_history_returns_nones(self) -> None:
+ past, pct = market_aggregator.compute_trend_for_listing("not json", 2500)
+ assert past is None and pct is None
+
+ def test_history_only_recent_returns_nones(self) -> None:
+ """History exists but no entry old enough."""
+ now = datetime(2026, 5, 16, 12, 0, 0)
+ history = _hist([(now - timedelta(days=2), 2500)])
+ past, pct = market_aggregator.compute_trend_for_listing(
+ history, 2500, lookback_days=14, now=now,
+ )
+ assert past is None and pct is None
+
+ def test_price_dropped(self) -> None:
+ now = datetime(2026, 5, 16, 12, 0, 0)
+ history = _hist([
+ (now - timedelta(days=30), 2800),
+ (now - timedelta(days=20), 2700),
+ (now - timedelta(days=10), 2500),
+ ])
+ # Lookback 14d → cutoff at day -14, latest entry on/before is day -20 (price 2700).
+ past, pct = market_aggregator.compute_trend_for_listing(
+ history, 2500, lookback_days=14, now=now,
+ )
+ assert past == 2700
+ assert pct == round((2500 - 2700) / 2700 * 100, 2)
+ assert pct < 0
+
+ def test_price_rose(self) -> None:
+ now = datetime(2026, 5, 16, 12, 0, 0)
+ history = _hist([(now - timedelta(days=20), 2000)])
+ past, pct = market_aggregator.compute_trend_for_listing(
+ history, 2200, lookback_days=14, now=now,
+ )
+ assert past == 2000
+ assert pct == 10.0
+
+ def test_current_price_zero_returns_nones(self) -> None:
+ past, pct = market_aggregator.compute_trend_for_listing(
+ _hist([(datetime(2026, 1, 1), 2500)]), 0,
+ )
+ assert past is None and pct is None
+
+
+# --- _stats ----------------------------------------------------------------
+
+class TestStats:
+ def test_empty(self) -> None:
+ out = market_aggregator._stats([])
+ assert out == {"median": None, "mean": None, "count": 0}
+
+ def test_filters_nonpositive(self) -> None:
+ out = market_aggregator._stats([0, -1, None, 2000, 3000]) # type: ignore[list-item]
+ assert out["count"] == 2
+ assert out["median"] == 2500
+ assert out["mean"] == 2500.0
+
+ def test_single_value(self) -> None:
+ out = market_aggregator._stats([1500])
+ assert out == {"median": 1500.0, "mean": 1500.0, "count": 1}
+
+
+# --- compute_aggregate_snapshot — integration on SQLite ------------------
+
+@pytest.fixture
+def engine_with_seed():
+ """In-memory SQLite seeded with a tiny RentListing set in the 1-2 bed band."""
+ engine = create_engine("sqlite://")
+ SQLModel.metadata.create_all(engine)
+ with Session(engine) as session:
+ for i, (rooms, price, sqm) in enumerate(
+ [(1, 2000, 40), (1, 2500, 50), (2, 3000, 60), (2, 4000, 80), (3, 5000, 100)],
+ start=1,
+ ):
+ session.add(
+ RentListing(
+ id=i, price=price, number_of_bedrooms=rooms,
+ square_meters=sqm, longitude=0.0, latitude=0.0,
+ price_history_json="[]", listing_site=ListingSite.RIGHTMOVE,
+ last_seen=datetime(2026, 5, 16),
+ furnish_type=FurnishType.UNKNOWN,
+ )
+ )
+ session.commit()
+ return engine
+
+
+class TestComputeAggregateSnapshot:
+ def test_writes_one_row_per_band(self, engine_with_seed) -> None:
+ rows = market_aggregator.compute_aggregate_snapshot(
+ engine_with_seed,
+ listing_types=("RENT",),
+ bedroom_bands=((1, 2),),
+ snapshot_date=datetime(2026, 5, 16),
+ )
+ assert len(rows) == 1
+ row = rows[0]
+ assert row.listing_count == 4 # excludes the 3-bed
+ assert row.median_total_price == 2750.0 # median of [2000,2500,3000,4000]
+ assert row.mean_total_price == 2875.0
+ # qmprice values: 50, 50, 50, 50 → median 50
+ assert row.median_qmprice == 50.0
+
+ def test_upsert_idempotent(self, engine_with_seed) -> None:
+ market_aggregator.compute_aggregate_snapshot(
+ engine_with_seed,
+ listing_types=("RENT",),
+ bedroom_bands=((1, 2),),
+ snapshot_date=datetime(2026, 5, 16),
+ )
+ market_aggregator.compute_aggregate_snapshot(
+ engine_with_seed,
+ listing_types=("RENT",),
+ bedroom_bands=((1, 2),),
+ snapshot_date=datetime(2026, 5, 16),
+ )
+ with Session(engine_with_seed) as session:
+ count = session.query(DailyListingAggregate).count()
+ assert count == 1 # no duplicate row
diff --git a/ui_exporter.py b/ui_exporter.py
index 179fba5..1fbbbf5 100644
--- a/ui_exporter.py
+++ b/ui_exporter.py
@@ -143,6 +143,10 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A
"price_history": [item.to_dict() for item in listing.price_history],
"agency": listing.agency,
"available_from": property_info.get("letDateAvailable", None),
+ # Per-listing trend snapshot (populated by the daily aggregator —
+ # null until the aggregator has seen this listing at least once).
+ "price_14d_ago": listing.price_14d_ago,
+ "price_change_pct_14d": listing.price_change_pct_14d,
}
if isinstance(listing, BuyListing):