From 49e3514780c666df5ea712a5f0778c5dcd56019d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 16 May 2026 12:02:25 +0000 Subject: [PATCH] wrongmove: daily price-trend monitoring (per-listing badge + macro strip) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two surfaces wired up so the user can "get a vibe of the market": **Per-listing** — each PropertyCard now shows a small pill next to the price when the listing's total_price moved >=1% over a 14-day lookback (e.g. "↓ £200 (-4%) in 14d"). Drops render green, rises render red. Computed from `price_history_json` by the daily aggregator and denormalised onto the listing row so the streaming endpoint just passes it through. **Macro** — new always-visible inline strip above the chip strip showing today's median total price, median £/m², and listing count for the current filter's bedroom band, each with a 30-day % delta: "Rent · 1-2 bed · 30d: Median £2,500 ↓ -4% · £/m² £50 ↓ -2% · Listings 4,200 ↑ +5%". Both data sources are populated daily at 04:00 UTC by a new Celery beat task that fires 1h after the 03:00 RENT scrape and feeds two sinks: a per-listing update pass and an upsert to a new `dailylistingaggregate` table keyed on (snapshot_date, listing_type, min_bedrooms, max_bedrooms). ## Backend - `models/listing.py`: Listing parent gains `price_14d_ago` + `price_ change_pct_14d` nullable floats (inherited by RentListing/BuyListing). New `DailyListingAggregate` table model with unique constraint on (date, type, min_bed, max_bed). - Alembic `a8b9c0d1e2f3`: adds the two columns to both listing tables and creates the aggregate table + date index. - `services/market_aggregator.py` (new): `compute_trend_for_listing`, `update_per_listing_trend` (batched, idempotent), `_stats` (median + mean filtered to positive finite values), `compute_aggregate_ snapshot` (dialect-aware MySQL / SQLite upsert), `fetch_trend_ series` (range query for the API). - `tasks/market_tasks.py` (new): `compute_daily_market_aggregates_task` Celery task wrapping both stages. - `tasks/listing_tasks.py:setup_periodic_tasks`: registers the daily task at 04:00 UTC alongside the existing scrape schedules. - `celery_app.py`: includes the new tasks module. - `api/app.py`: new `GET /api/market_trend?listing_type=&min_bedrooms=& max_bedrooms=&days=` endpoint returning the daily series. - `ui_exporter.py`: GeoJSON feature properties now carry `price_14d_ago` and `price_change_pct_14d` so the frontend can render the badge without an extra round-trip. ## Frontend - `types/index.ts`: new `MarketTrendPoint`; `PropertyProperties` gains the two optional trend fields. - `components/PropertyCard.tsx`: derived `trendBadge` (>=1% threshold, null-safe) rendered as a small pill on both card variants. - `hooks/useMarketTrend.ts` (new): fetches the trend series, derives current-vs-oldest deltas per metric (% change rounded to 1dp). - `components/MarketTrendStrip.tsx` (new): compact inline strip with three metric cells. Hidden when the aggregator hasn't produced any rows yet (graceful start during the first week post-launch). - `App.tsx`: renders the strip above the chip strip whenever the active queryParameters are known. ## Tests - pytest: 10 new (trend math edge cases including null history, malformed JSON, only-recent entries, drops, rises, zero current price; _stats empty / nonpositive filtering; upsert idempotency on an in-memory SQLite seed). 34 decision + aggregator tests pass. - vitest: 8 new (useMarketTrend fetch URL, two-point delta, single-point null delta, empty series; PropertyCard trend badge arrow direction + sign for drops/rises, noise threshold, null guard). 229 tests pass total, tsc clean. Co-Authored-By: Claude Opus 4.7 --- ...3_add_price_trend_columns_and_aggregate.py | 68 +++++ api/app.py | 47 +++ celery_app.py | 2 +- frontend/src/App.tsx | 12 + frontend/src/components/MarketTrendStrip.tsx | 95 ++++++ frontend/src/components/PropertyCard.tsx | 38 +++ .../__tests__/PropertyCard.test.tsx | 47 +++ .../hooks/__tests__/useMarketTrend.test.tsx | 94 ++++++ frontend/src/hooks/useMarketTrend.ts | 93 ++++++ frontend/src/types/index.ts | 14 + models/listing.py | 37 +++ services/market_aggregator.py | 287 ++++++++++++++++++ tasks/listing_tasks.py | 12 + tasks/market_tasks.py | 57 ++++ tests/unit/test_market_aggregator.py | 163 ++++++++++ ui_exporter.py | 4 + 16 files changed, 1069 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/a8b9c0d1e2f3_add_price_trend_columns_and_aggregate.py create mode 100644 frontend/src/components/MarketTrendStrip.tsx create mode 100644 frontend/src/hooks/__tests__/useMarketTrend.test.tsx create mode 100644 frontend/src/hooks/useMarketTrend.ts create mode 100644 services/market_aggregator.py create mode 100644 tasks/market_tasks.py create mode 100644 tests/unit/test_market_aggregator.py diff --git a/alembic/versions/a8b9c0d1e2f3_add_price_trend_columns_and_aggregate.py b/alembic/versions/a8b9c0d1e2f3_add_price_trend_columns_and_aggregate.py new file mode 100644 index 0000000..54a505a --- /dev/null +++ b/alembic/versions/a8b9c0d1e2f3_add_price_trend_columns_and_aggregate.py @@ -0,0 +1,68 @@ +"""add price trend columns and daily market aggregate table + +Revision ID: a8b9c0d1e2f3 +Revises: f7a8b9c0d1e2 +Create Date: 2026-05-16 12:00:00.000000 + +Wires the price-monitoring feature: +- Per-listing trend columns (`price_14d_ago`, `price_change_pct_14d`) on + RentListing and BuyListing. Both are nullable — they stay empty for + listings with no entry that old in their price_history_json. +- A new `dailylistingaggregate` table keyed on + (snapshot_date, listing_type, min_bedrooms, max_bedrooms) with median / + mean / count for the daily-filter scope. One row per day per band. +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'a8b9c0d1e2f3' +down_revision: Union[str, None] = 'f7a8b9c0d1e2' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + for table in ("rentlisting", "buylisting"): + op.add_column(table, sa.Column("price_14d_ago", sa.Float(), nullable=True)) + op.add_column( + table, sa.Column("price_change_pct_14d", sa.Float(), nullable=True) + ) + + op.create_table( + "dailylistingaggregate", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("snapshot_date", sa.DateTime(), nullable=False), + sa.Column("listing_type", sa.String(length=8), nullable=False), + sa.Column("min_bedrooms", sa.Integer(), nullable=False), + sa.Column("max_bedrooms", sa.Integer(), nullable=False), + sa.Column("listing_count", sa.Integer(), nullable=False), + sa.Column("median_total_price", sa.Float(), nullable=True), + sa.Column("median_qmprice", sa.Float(), nullable=True), + sa.Column("mean_total_price", sa.Float(), nullable=True), + sa.Column("mean_qmprice", sa.Float(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "snapshot_date", "listing_type", "min_bedrooms", "max_bedrooms", + name="uq_aggregate_date_filter", + ), + ) + op.create_index( + "ix_dailylistingaggregate_snapshot_date", + "dailylistingaggregate", + ["snapshot_date"], + ) + + +def downgrade() -> None: + op.drop_index( + "ix_dailylistingaggregate_snapshot_date", + table_name="dailylistingaggregate", + ) + op.drop_table("dailylistingaggregate") + for table in ("rentlisting", "buylisting"): + op.drop_column(table, "price_change_pct_14d") + op.drop_column(table, "price_14d_ago") diff --git a/api/app.py b/api/app.py index 1d9dcfc..2dfb3be 100644 --- a/api/app.py +++ b/api/app.py @@ -698,6 +698,53 @@ async def get_districts( return district_service.get_all_districts() +class MarketTrendPoint(BaseModel): + """One day of aggregated market stats for the (listing_type, bed-band).""" + snapshot_date: str + listing_count: int + median_total_price: float | None + median_qmprice: float | None + mean_total_price: float | None + mean_qmprice: float | None + + +@app.get("/api/market_trend", response_model=list[MarketTrendPoint]) +async def get_market_trend( + user: Annotated[User, Depends(get_current_user)], + listing_type: str = Query("RENT", description="RENT or BUY"), + min_bedrooms: int = Query(1, ge=0), + max_bedrooms: int = Query(2, ge=0), + days: int = Query(30, ge=1, le=365, description="Lookback window in days"), +) -> list[MarketTrendPoint]: + """Daily aggregate snapshots for the requested (type × bed-band) over + the last N days. Powers the MarketTrendStrip UI.""" + from services.market_aggregator import fetch_trend_series # noqa: PLC0415 + + if listing_type not in {"RENT", "BUY"}: + raise HTTPException(status_code=400, detail="listing_type must be RENT or BUY") + if min_bedrooms > max_bedrooms: + raise HTTPException(status_code=400, detail="min_bedrooms must be <= max_bedrooms") + + rows = fetch_trend_series( + engine, + listing_type=listing_type, + min_bedrooms=min_bedrooms, + max_bedrooms=max_bedrooms, + days=days, + ) + return [ + MarketTrendPoint( + snapshot_date=r.snapshot_date.isoformat(), + listing_count=r.listing_count, + median_total_price=r.median_total_price, + median_qmprice=r.median_qmprice, + mean_total_price=r.mean_total_price, + mean_qmprice=r.mean_qmprice, + ) + for r in rows + ] + + class ListingDetailResponse(BaseModel): id: int price: float diff --git a/celery_app.py b/celery_app.py index 148a7e5..185426b 100644 --- a/celery_app.py +++ b/celery_app.py @@ -15,7 +15,7 @@ app = Celery( "celery_app", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"), - include=["tasks.listing_tasks", "tasks.poi_tasks"], + include=["tasks.listing_tasks", "tasks.poi_tasks", "tasks.market_tasks"], ) # Keep broker / result-backend connections alive when sitting behind an diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 03eb6dd..1528ec3 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -11,6 +11,7 @@ import { Map } from './components/Map'; import { type ParameterValues, DEFAULT_FILTER_VALUES, Metric, ListingType } from './components/FilterPanel'; import { FilterBar, type FilterBarFormHandle } from './components/FilterBar'; import { FilterChips } from './components/FilterChips'; +import { MarketTrendStrip } from './components/MarketTrendStrip'; import { VisualizationCard } from './components/VisualizationCard'; import { Header } from './components/Header'; import { StatsBar } from './components/StatsBar'; @@ -856,6 +857,17 @@ function AppContent() { /> + {/* Macro market-trend strip — always visible, gives a "vibe of + the market" for the current filter's bedroom band. */} + {queryParameters && ( + + )} + {/* Active Filter Chips */} {queryParameters && ( {EM_DASH}; + + const formatted = (() => { + if (kind === 'price') return formatPrice(delta.current); + if (kind === 'pricePerSqm') return `£${Math.round(delta.current)}/m²`; + return delta.current.toLocaleString(); + })(); + + if (delta.changePct === null || delta.previous === null) { + return {formatted}; + } + + // Drops in price/£m² are good (green); rises are red. For listing count + // direction has no inherent good/bad — just show the move neutrally. + const isPriceMetric = kind === 'price' || kind === 'pricePerSqm'; + const dropped = delta.changePct < 0; + const colour = isPriceMetric + ? (Math.abs(delta.changePct) < 0.1 + ? 'text-muted-foreground' + : dropped + ? 'text-[var(--deal-good)]' + : 'text-[var(--deal-above)]') + : 'text-muted-foreground'; + const arrow = Math.abs(delta.changePct) < 0.1 ? '·' : dropped ? '↓' : '↑'; + const sign = delta.changePct > 0 ? '+' : ''; + return ( + + {formatted}{' '} + + {arrow} {sign}{delta.changePct}% + + + ); +} + +export function MarketTrendStrip({ + user, + listingType, + minBedrooms, + maxBedrooms, + days = 30, +}: Props) { + const { series, isLoading, deltas } = useMarketTrend( + user, + listingType, + minBedrooms, + maxBedrooms, + days, + ); + + if (isLoading && series.length === 0) { + return ( +
+ Loading market trend… +
+ ); + } + // Hide entirely when we have no data at all (the daily aggregator hasn't + // produced any rows yet — common during the first week post-launch). + if (series.length === 0) return null; + + const label = `${listingType === 'RENT' ? 'Rent' : 'Buy'} · ${minBedrooms}-${maxBedrooms} bed · ${days}d`; + + return ( +
+ {label}: + Median + · + £/m² + · + Listings +
+ ); +} diff --git a/frontend/src/components/PropertyCard.tsx b/frontend/src/components/PropertyCard.tsx index dd1b31b..9702124 100644 --- a/frontend/src/components/PropertyCard.tsx +++ b/frontend/src/components/PropertyCard.tsx @@ -228,6 +228,28 @@ export function PropertyCard({ ? { dotColor: 'bg-[var(--deal-above)]', label: 'Above avg' } : null; + // Per-listing trend badge — surfaces when the daily aggregator has + // computed a non-trivial price move over the 14d lookback window. + // Threshold = 1% so we don't render noise from rounding or minor jitters. + const trendBadge = (() => { + const pct = property.price_change_pct_14d; + const past = property.price_14d_ago; + if (typeof pct !== 'number' || !isFiniteNumber(pct)) return null; + if (Math.abs(pct) < 1) return null; + const dropped = pct < 0; + const deltaAbs = isFiniteNumber(past) && isFiniteNumber(property.total_price) + ? Math.abs(property.total_price - past) + : null; + return { + dropped, + label: `${dropped ? '↓' : '↑'} ${deltaAbs !== null ? formatPrice(deltaAbs) : ''} (${pct > 0 ? '+' : ''}${pct}%) in 14d`.replace(/\s+/g, ' ').trim(), + // Drops are good for the buyer; greens for drop, reds for rise. + className: dropped + ? 'text-[var(--deal-good)] bg-[var(--deal-good)]/10 border-[var(--deal-good)]/40' + : 'text-[var(--deal-above)] bg-[var(--deal-above)]/10 border-[var(--deal-above)]/40', + }; + })(); + const handleClick = () => { onClick?.(); }; @@ -266,6 +288,14 @@ export function PropertyCard({ {priceIndicator && ( {priceIndicator.label} )} + {trendBadge && ( + + {trendBadge.label} + + )} {/* Key metrics on one line */} @@ -344,6 +374,14 @@ export function PropertyCard({ {priceIndicator && ( {priceIndicator.label} )} + {trendBadge && ( + + {trendBadge.label} + + )} {/* Key metrics on one line */} diff --git a/frontend/src/components/__tests__/PropertyCard.test.tsx b/frontend/src/components/__tests__/PropertyCard.test.tsx index 4d7bcc5..c35da81 100644 --- a/frontend/src/components/__tests__/PropertyCard.test.tsx +++ b/frontend/src/components/__tests__/PropertyCard.test.tsx @@ -223,4 +223,51 @@ describe('PropertyCard', () => { expect(container.querySelector('button[aria-label="Previous photo"]')).not.toBeInTheDocument(); expect(container.querySelector('button[aria-label="Next photo"]')).not.toBeInTheDocument(); }); + + // Price-trend badge — renders for moves >=1%, hidden for noise / nulls. + it('renders a "↓" trend badge when price dropped >1% in 14d', () => { + const property = { + ...createMockProperty({ total_price: 2400 }), + price_14d_ago: 2500, + price_change_pct_14d: -4, + } as unknown as PropertyProperties; + const { container } = render(); + const text = container.textContent ?? ''; + expect(text).toMatch(/↓/); + expect(text).toMatch(/-4%/); + expect(text).toMatch(/14d/); + }); + + it('renders a "↑" trend badge when price rose >1% in 14d', () => { + const property = { + ...createMockProperty({ total_price: 2200 }), + price_14d_ago: 2000, + price_change_pct_14d: 10, + } as unknown as PropertyProperties; + const { container } = render(); + const text = container.textContent ?? ''; + expect(text).toMatch(/↑/); + expect(text).toMatch(/\+10%/); + }); + + it('omits the trend badge when the move is < 1% (noise threshold)', () => { + const property = { + ...createMockProperty({ total_price: 2510 }), + price_14d_ago: 2500, + price_change_pct_14d: 0.4, + } as unknown as PropertyProperties; + const { container } = render(); + expect(container.textContent).not.toMatch(/↑|↓/); + expect(container.textContent).not.toMatch(/14d/); + }); + + it('omits the trend badge when price_change_pct_14d is null', () => { + const property = { + ...createMockProperty({ total_price: 2500 }), + price_14d_ago: null, + price_change_pct_14d: null, + } as unknown as PropertyProperties; + const { container } = render(); + expect(container.textContent).not.toMatch(/14d/); + }); }); diff --git a/frontend/src/hooks/__tests__/useMarketTrend.test.tsx b/frontend/src/hooks/__tests__/useMarketTrend.test.tsx new file mode 100644 index 0000000..b605b3d --- /dev/null +++ b/frontend/src/hooks/__tests__/useMarketTrend.test.tsx @@ -0,0 +1,94 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { renderHook, waitFor } from '@testing-library/react'; +import { useMarketTrend } from '@/hooks/useMarketTrend'; +import type { AuthUser } from '@/auth/types'; + +vi.mock('@/services/apiClient', () => ({ + apiRequest: vi.fn(), +})); +import { apiRequest } from '@/services/apiClient'; + +const apiRequestMock = vi.mocked(apiRequest); + +const user: AuthUser = { + sub: 'u', + email: 'a@b.com', + name: 'A', + accessToken: 'tok', + provider: 'oidc', +}; + +describe('useMarketTrend — series fetch + delta derivation', () => { + beforeEach(() => { + apiRequestMock.mockReset(); + }); + + it('hits /api/market_trend with the requested filter params', async () => { + apiRequestMock.mockResolvedValue([]); + const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30)); + await waitFor(() => expect(result.current.isLoading).toBe(false)); + expect(apiRequestMock).toHaveBeenCalledWith( + user, + expect.stringContaining('/api/market_trend?'), + ); + const url = apiRequestMock.mock.calls[0][1] as string; + expect(url).toContain('listing_type=RENT'); + expect(url).toContain('min_bedrooms=1'); + expect(url).toContain('max_bedrooms=2'); + expect(url).toContain('days=30'); + }); + + it('computes deltas between the oldest and newest in-window points', async () => { + apiRequestMock.mockResolvedValue([ + { + snapshot_date: '2026-04-16', + listing_count: 1000, + median_total_price: 2500, + median_qmprice: 50, + mean_total_price: 2600, + mean_qmprice: 52, + }, + { + snapshot_date: '2026-05-16', + listing_count: 1050, + median_total_price: 2400, + median_qmprice: 48, + mean_total_price: 2500, + mean_qmprice: 50, + }, + ]); + const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30)); + await waitFor(() => expect(result.current.series.length).toBe(2)); + expect(result.current.deltas.median_total_price.current).toBe(2400); + expect(result.current.deltas.median_total_price.previous).toBe(2500); + // (2400 - 2500) / 2500 * 100 = -4 + expect(result.current.deltas.median_total_price.changePct).toBe(-4); + expect(result.current.deltas.listing_count.changePct).toBe(5); + }); + + it('returns null changePct when there is only one datapoint', async () => { + apiRequestMock.mockResolvedValue([ + { + snapshot_date: '2026-05-16', + listing_count: 1050, + median_total_price: 2400, + median_qmprice: 48, + mean_total_price: 2500, + mean_qmprice: 50, + }, + ]); + const { result } = renderHook(() => useMarketTrend(user, 'RENT', 1, 2, 30)); + await waitFor(() => expect(result.current.series.length).toBe(1)); + expect(result.current.deltas.median_total_price.changePct).toBeNull(); + expect(result.current.deltas.median_total_price.current).toBe(2400); + }); + + it('returns empty series + null deltas when the endpoint is empty', async () => { + apiRequestMock.mockResolvedValue([]); + const { result } = renderHook(() => useMarketTrend(user, 'BUY', 1, 2, 30)); + await waitFor(() => expect(result.current.isLoading).toBe(false)); + expect(result.current.series).toEqual([]); + expect(result.current.deltas.median_total_price.current).toBeNull(); + expect(result.current.deltas.median_total_price.changePct).toBeNull(); + }); +}); diff --git a/frontend/src/hooks/useMarketTrend.ts b/frontend/src/hooks/useMarketTrend.ts new file mode 100644 index 0000000..f3d0f2d --- /dev/null +++ b/frontend/src/hooks/useMarketTrend.ts @@ -0,0 +1,93 @@ +// Fetches the daily market aggregate series for a given listing-type + +// bedroom band. Re-fetches when the inputs change. Returns the raw array +// of points plus a derived "now vs N days ago" delta the strip renders. + +import { useEffect, useState } from 'react'; +import type { AuthUser } from '@/auth/types'; +import type { MarketTrendPoint } from '@/types'; +import { apiRequest } from '@/services/apiClient'; + +export interface MarketTrendDelta { + metric: 'median_total_price' | 'median_qmprice' | 'listing_count'; + current: number | null; + previous: number | null; + changePct: number | null; +} + +export interface UseMarketTrendResult { + series: MarketTrendPoint[]; + isLoading: boolean; + error: string | null; + // Convenience: today's value vs the oldest in-window value. + deltas: Record; +} + +function buildDelta( + metric: MarketTrendDelta['metric'], + series: MarketTrendPoint[], +): MarketTrendDelta { + if (series.length < 2) { + const only = series[0]; + return { + metric, + current: only ? (only[metric] as number | null) : null, + previous: null, + changePct: null, + }; + } + const current = series[series.length - 1][metric] as number | null; + const previous = series[0][metric] as number | null; + if (current === null || previous === null || previous === 0) { + return { metric, current, previous, changePct: null }; + } + const changePct = Math.round(((current - previous) / previous) * 1000) / 10; + return { metric, current, previous, changePct }; +} + +export function useMarketTrend( + user: AuthUser | null, + listingType: 'RENT' | 'BUY', + minBedrooms: number, + maxBedrooms: number, + days: number = 30, +): UseMarketTrendResult { + const [series, setSeries] = useState([]); + const [isLoading, setIsLoading] = useState(false); + const [error, setError] = useState(null); + + useEffect(() => { + if (!user) return; + let cancelled = false; + setIsLoading(true); + setError(null); + const params = new URLSearchParams({ + listing_type: listingType, + min_bedrooms: String(minBedrooms), + max_bedrooms: String(maxBedrooms), + days: String(days), + }); + apiRequest(user, `/api/market_trend?${params}`) + .then((data) => { + if (cancelled) return; + setSeries(data); + }) + .catch((err: Error) => { + if (cancelled) return; + setError(err.message); + }) + .finally(() => { + if (!cancelled) setIsLoading(false); + }); + return () => { + cancelled = true; + }; + }, [user, listingType, minBedrooms, maxBedrooms, days]); + + const deltas: UseMarketTrendResult['deltas'] = { + median_total_price: buildDelta('median_total_price', series), + median_qmprice: buildDelta('median_qmprice', series), + listing_count: buildDelta('listing_count', series), + }; + + return { series, isLoading, error, deltas }; +} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index aabb2b2..10c2374 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -24,6 +24,10 @@ export interface PropertyProperties { price_history: PropertyPriceHistory[]; listing_type?: 'RENT' | 'BUY'; poi_distances?: POIDistanceInfo[]; + // Trend snapshot maintained by the daily market aggregator (nullable + // until the aggregator has run at least once with this listing in scope). + price_14d_ago?: number | null; + price_change_pct_14d?: number | null; } export interface PropertyFeature { @@ -181,6 +185,16 @@ export interface WSPongMessage { export type WSMessage = WSInitMessage | WSTaskUpdateMessage | WSPongMessage; +// One day of aggregated market stats — see /api/market_trend. +export interface MarketTrendPoint { + snapshot_date: string; // ISO date + listing_count: number; + median_total_price: number | null; + median_qmprice: number | null; + mean_total_price: number | null; + mean_qmprice: number | null; +} + // Decision types // `seen` is a soft-hide: the listing is removed from the main list by default // but can be re-revealed via the "Show hidden" filter toggle. Distinct from diff --git a/models/listing.py b/models/listing.py index 9d969ec..83715f7 100644 --- a/models/listing.py +++ b/models/listing.py @@ -7,6 +7,7 @@ import json from typing import Any, Dict, List from pydantic import BaseModel, Field as PydanticField, model_validator from rec import routing +from sqlalchemy import UniqueConstraint from sqlmodel import JSON, TEXT, SQLModel, Field @@ -92,6 +93,14 @@ class Listing(SQLModel, table=False): sa_type=TEXT, nullable=True, default=None ) # Store as JSON string for simplicity + # Per-listing price-trend snapshot maintained by the daily aggregator. + # `price_14d_ago` is the historical price ~14 days before the most recent + # aggregator run (sourced from price_history_json). `price_change_pct_14d` + # is the % change from that to the current `price` (positive=up, neg=down). + # Both are null when the listing has no entry that old in its history. + price_14d_ago: float | None = Field(default=None, nullable=True) + price_change_pct_14d: float | None = Field(default=None, nullable=True) + @property def is_removed(self) -> bool: if not self.additional_info: @@ -176,6 +185,34 @@ class BuyListing(Listing, table=True): ) # in years, e.g., 90, 80, etc. +class DailyListingAggregate(SQLModel, table=True): + """One row per (snapshot_date, listing_type, bedroom band). + + Written daily by `compute_daily_market_aggregates_task` after the scrape + settles. Drives the `MarketTrendStrip` UI ("get a vibe of the market"). + The (date, listing_type, min_bedrooms, max_bedrooms) tuple is unique; + the aggregator upserts rather than appends so re-running on the same day + refreshes the snapshot instead of duplicating it. + """ + __table_args__ = ( + UniqueConstraint( + "snapshot_date", "listing_type", "min_bedrooms", "max_bedrooms", + name="uq_aggregate_date_filter", + ), + ) + + id: int | None = Field(default=None, primary_key=True) + snapshot_date: datetime = Field(nullable=False, index=True) + listing_type: str = Field(nullable=False) # "RENT" or "BUY" + min_bedrooms: int = Field(nullable=False) + max_bedrooms: int = Field(nullable=False) + listing_count: int = Field(nullable=False) + median_total_price: float | None = Field(default=None, nullable=True) + median_qmprice: float | None = Field(default=None, nullable=True) + mean_total_price: float | None = Field(default=None, nullable=True) + mean_qmprice: float | None = Field(default=None, nullable=True) + + @dataclass(frozen=True) class DestinationMode: destination_address: str diff --git a/services/market_aggregator.py b/services/market_aggregator.py new file mode 100644 index 0000000..7ce511a --- /dev/null +++ b/services/market_aggregator.py @@ -0,0 +1,287 @@ +"""Daily market-trend aggregator. + +Two outputs per run: + +1. Per-listing trend columns. For each row in RentListing / BuyListing we + parse `price_history_json` and find the price entry whose `last_seen` was + closest to `lookback_days` ago. The current price and that historical + price land on `price_14d_ago` / `price_change_pct_14d` for the + PropertyCard badge to render. + +2. Aggregate market snapshot. For each configured (listing_type, bedroom + band) we compute median/mean/count over the CURRENT listing pool and + upsert one row in `dailylistingaggregate` keyed on today's date. The + `MarketTrendStrip` UI consumes these rows. + +Both steps are idempotent — re-running on the same day refreshes the +snapshot rather than appending. Designed to fire daily ~04:00 UTC (1h +after the 03:00 RENT scrape so the data is fresh). +""" +from __future__ import annotations + +import json +import logging +import time +from datetime import datetime, timedelta +from statistics import mean, median +from typing import Iterable + +from sqlalchemy import Engine +from sqlmodel import Session, select + +from models.listing import ( + BuyListing, + DailyListingAggregate, + PriceHistoryItem, + RentListing, +) + +logger = logging.getLogger("uvicorn") + +# Default scope: the user's daily filter (1-2 bed, both listing types). +DEFAULT_BEDROOM_BANDS: tuple[tuple[int, int], ...] = ((1, 2),) +DEFAULT_LISTING_TYPES: tuple[str, ...] = ("RENT", "BUY") + +# Trend lookback window for the per-listing badge. Surfaces price moves +# that happened in the last fortnight (long enough for prices to actually +# settle, short enough to feel current). +DEFAULT_LOOKBACK_DAYS = 14 + + +def _parse_history(price_history_json: str | None) -> list[PriceHistoryItem]: + if not price_history_json: + return [] + try: + raw = json.loads(price_history_json) + except (ValueError, TypeError): + return [] + out: list[PriceHistoryItem] = [] + for item in raw: + try: + out.append( + PriceHistoryItem( + first_seen=datetime.fromisoformat(item["first_seen"]), + last_seen=datetime.fromisoformat(item["last_seen"]), + price=float(item["price"]), + ) + ) + except (KeyError, ValueError, TypeError): + continue + return out + + +def _price_at_or_before( + history: list[PriceHistoryItem], cutoff: datetime +) -> float | None: + """Return the price of the entry whose `last_seen` is closest to (but + not after) `cutoff`. Returns None if no entry that old exists. + + History is in chronological order; we scan and keep the latest match. + """ + found: float | None = None + for item in history: + if item.last_seen <= cutoff: + found = item.price + else: + break + return found + + +def compute_trend_for_listing( + price_history_json: str | None, + current_price: float | None, + *, + lookback_days: int = DEFAULT_LOOKBACK_DAYS, + now: datetime | None = None, +) -> tuple[float | None, float | None]: + """Return `(price_n_days_ago, change_pct)` for one listing. + + `change_pct` is `(current - past) / past * 100` rounded to 2dp; positive + = price went up, negative = down. Both are None when there's no entry + that old in history or current price is unusable. + """ + if not isinstance(current_price, (int, float)) or current_price <= 0: + return None, None + cutoff = (now or datetime.utcnow()) - timedelta(days=lookback_days) + history = _parse_history(price_history_json) + past = _price_at_or_before(history, cutoff) + if past is None or past <= 0: + return None, None + pct = round((current_price - past) / past * 100.0, 2) + return past, pct + + +def update_per_listing_trend( + engine: Engine, + *, + lookback_days: int = DEFAULT_LOOKBACK_DAYS, + batch_size: int = 1000, + now: datetime | None = None, +) -> dict[str, int]: + """Walk every RentListing + BuyListing, recompute trend columns, write.""" + counts = {"rent_updated": 0, "buy_updated": 0} + t0 = time.monotonic() + for model_name, model in (("rent", RentListing), ("buy", BuyListing)): + with Session(engine) as session: + offset = 0 + while True: + stmt = select(model).offset(offset).limit(batch_size) + rows: list = list(session.exec(stmt).all()) + if not rows: + break + for row in rows: + past, pct = compute_trend_for_listing( + row.price_history_json, + row.price, + lookback_days=lookback_days, + now=now, + ) + if row.price_14d_ago != past or row.price_change_pct_14d != pct: + row.price_14d_ago = past + row.price_change_pct_14d = pct + session.add(row) + counts[f"{model_name}_updated"] += 1 + session.commit() + if len(rows) < batch_size: + break + offset += batch_size + logger.info( + "Per-listing trend updated in %.1fs: rent=%d buy=%d (lookback=%dd)", + time.monotonic() - t0, + counts["rent_updated"], + counts["buy_updated"], + lookback_days, + ) + return counts + + +def _stats(values: Iterable[float]) -> dict[str, float | None]: + """Median + mean over the valid positive entries; null for empty input.""" + finite = [v for v in values if isinstance(v, (int, float)) and v > 0] + if not finite: + return {"median": None, "mean": None, "count": 0} + return { + "median": float(median(finite)), + "mean": round(float(mean(finite)), 2), + "count": len(finite), + } + + +def compute_aggregate_snapshot( + engine: Engine, + *, + listing_types: tuple[str, ...] = DEFAULT_LISTING_TYPES, + bedroom_bands: tuple[tuple[int, int], ...] = DEFAULT_BEDROOM_BANDS, + snapshot_date: datetime | None = None, +) -> list[DailyListingAggregate]: + """Compute one aggregate row per (listing_type * bedroom band) and + upsert it onto today's `snapshot_date`. Returns the persisted rows. + + Uses an `INSERT ... ON DUPLICATE KEY UPDATE` so re-running on the same + day refreshes the row in place — no duplicates, no DELETE. + """ + today = snapshot_date or datetime.utcnow().replace( + hour=0, minute=0, second=0, microsecond=0 + ) + written: list[DailyListingAggregate] = [] + dialect = engine.dialect.name + with Session(engine) as session: + for listing_type in listing_types: + model = RentListing if listing_type == "RENT" else BuyListing + for min_bed, max_bed in bedroom_bands: + stmt = select(model.price, model.square_meters).where( + model.number_of_bedrooms >= min_bed, + model.number_of_bedrooms <= max_bed, + ) + rows = list(session.exec(stmt).all()) + prices = [r[0] for r in rows] + qmprices = [ + (r[0] / r[1]) + for r in rows + if r[1] is not None and r[1] > 0 + ] + price_stats = _stats(prices) + qm_stats = _stats(qmprices) + values = { + "snapshot_date": today, + "listing_type": listing_type, + "min_bedrooms": min_bed, + "max_bedrooms": max_bed, + "listing_count": price_stats["count"], + "median_total_price": price_stats["median"], + "median_qmprice": qm_stats["median"], + "mean_total_price": price_stats["mean"], + "mean_qmprice": qm_stats["mean"], + } + if dialect == "mysql": + from sqlalchemy.dialects.mysql import insert as mysql_insert + stmt_ins = mysql_insert(DailyListingAggregate).values(**values) + stmt_ins = stmt_ins.on_duplicate_key_update( + listing_count=stmt_ins.inserted.listing_count, + median_total_price=stmt_ins.inserted.median_total_price, + median_qmprice=stmt_ins.inserted.median_qmprice, + mean_total_price=stmt_ins.inserted.mean_total_price, + mean_qmprice=stmt_ins.inserted.mean_qmprice, + ) + session.execute(stmt_ins) + else: + from sqlalchemy.dialects.sqlite import insert as sqlite_insert + stmt_ins = sqlite_insert(DailyListingAggregate).values(**values) + stmt_ins = stmt_ins.on_conflict_do_update( + index_elements=[ + "snapshot_date", "listing_type", + "min_bedrooms", "max_bedrooms", + ], + set_={ + "listing_count": stmt_ins.excluded.listing_count, + "median_total_price": stmt_ins.excluded.median_total_price, + "median_qmprice": stmt_ins.excluded.median_qmprice, + "mean_total_price": stmt_ins.excluded.mean_total_price, + "mean_qmprice": stmt_ins.excluded.mean_qmprice, + }, + ) + session.execute(stmt_ins) + session.commit() + row = session.exec( + select(DailyListingAggregate).where( + DailyListingAggregate.snapshot_date == today, + DailyListingAggregate.listing_type == listing_type, + DailyListingAggregate.min_bedrooms == min_bed, + DailyListingAggregate.max_bedrooms == max_bed, + ) + ).first() + if row is not None: + written.append(row) + logger.info( + "Aggregate %s %d-%d on %s: count=%s median=%s/%s mean=%s/%s", + listing_type, min_bed, max_bed, today.date(), + price_stats["count"], + price_stats["median"], qm_stats["median"], + price_stats["mean"], qm_stats["mean"], + ) + return written + + +def fetch_trend_series( + engine: Engine, + *, + listing_type: str, + min_bedrooms: int, + max_bedrooms: int, + days: int = 30, +) -> list[DailyListingAggregate]: + """Return the aggregate rows for the last `days` days, ordered ascending + by date. Empty list when no rows match — the strip handles that case.""" + cutoff = datetime.utcnow() - timedelta(days=days) + with Session(engine) as session: + stmt = ( + select(DailyListingAggregate) + .where( + DailyListingAggregate.listing_type == listing_type, + DailyListingAggregate.min_bedrooms == min_bedrooms, + DailyListingAggregate.max_bedrooms == max_bedrooms, + DailyListingAggregate.snapshot_date >= cutoff, + ) + .order_by(DailyListingAggregate.snapshot_date) + ) + return list(session.exec(stmt).all()) diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py index 808823c..51a70c0 100644 --- a/tasks/listing_tasks.py +++ b/tasks/listing_tasks.py @@ -656,3 +656,15 @@ def setup_periodic_tasks(sender, **kwargs): dump_listings_task.s(schedule.to_query_parameters().model_dump_json()), name=schedule.name, ) + + # Daily market aggregator — fires after the 03:00 RENT scrape so the + # snapshot reflects today's freshly-pulled data. Imported lazily to + # avoid a circular import (market_tasks imports celery_app, which + # imports listing_tasks via the include list). + from tasks.market_tasks import compute_daily_market_aggregates_task + celery_logger.info("Registering periodic task: daily-market-aggregator at 4:00") + sender.add_periodic_task( + crontab(minute="0", hour="4", day_of_week="*"), + compute_daily_market_aggregates_task.s(), + name="daily-market-aggregator", + ) diff --git a/tasks/market_tasks.py b/tasks/market_tasks.py new file mode 100644 index 0000000..e57fd79 --- /dev/null +++ b/tasks/market_tasks.py @@ -0,0 +1,57 @@ +"""Daily market-trend aggregator Celery task. + +Fires daily at 04:00 UTC — one hour after the 03:00 RENT scrape so the +data is fresh. Calls into `services.market_aggregator` to: + 1. Recompute per-listing `price_14d_ago` / `price_change_pct_14d`. + 2. Upsert the per-(listing_type, bedroom-band) row in + `dailylistingaggregate` for today's snapshot. + +Idempotent: re-running on the same day refreshes both surfaces in place. +""" +from __future__ import annotations + +import logging +from typing import Any + +from celery_app import app +from database import engine +from services import market_aggregator + +celery_logger = logging.getLogger("celery_app") + + +@app.task( + bind=True, + name="tasks.market_tasks.compute_daily_market_aggregates_task", + time_limit=3600, + soft_time_limit=3500, + acks_late=True, +) +def compute_daily_market_aggregates_task(self: Any) -> dict[str, Any]: + """Run both stages of the daily market aggregator.""" + celery_logger.info("Starting daily market aggregator (task=%s)", self.request.id) + per_listing = market_aggregator.update_per_listing_trend(engine) + aggregates = market_aggregator.compute_aggregate_snapshot(engine) + result = { + "status": "ok", + "per_listing": per_listing, + "aggregates": [ + { + "snapshot_date": a.snapshot_date.isoformat(), + "listing_type": a.listing_type, + "min_bedrooms": a.min_bedrooms, + "max_bedrooms": a.max_bedrooms, + "listing_count": a.listing_count, + "median_total_price": a.median_total_price, + "median_qmprice": a.median_qmprice, + } + for a in aggregates + ], + } + celery_logger.info( + "Daily market aggregator complete: rent_updated=%s buy_updated=%s aggregates=%s", + per_listing.get("rent_updated"), + per_listing.get("buy_updated"), + len(aggregates), + ) + return result diff --git a/tests/unit/test_market_aggregator.py b/tests/unit/test_market_aggregator.py new file mode 100644 index 0000000..176f466 --- /dev/null +++ b/tests/unit/test_market_aggregator.py @@ -0,0 +1,163 @@ +"""Unit tests for the daily market aggregator.""" +from __future__ import annotations + +import json +from datetime import datetime, timedelta + +import pytest +from sqlalchemy import create_engine +from sqlmodel import SQLModel, Session + +from models.listing import ( + DailyListingAggregate, + FurnishType, + Listing, + ListingSite, + RentListing, +) +from services import market_aggregator + + +# --- compute_trend_for_listing -------------------------------------------- + +def _hist(entries: list[tuple[datetime, float]]) -> str: + return json.dumps( + [ + { + "first_seen": fs.isoformat(), + "last_seen": fs.isoformat(), + "price": p, + } + for fs, p in entries + ] + ) + + +class TestComputeTrendForListing: + def test_null_history_returns_nones(self) -> None: + past, pct = market_aggregator.compute_trend_for_listing(None, 2500) + assert past is None and pct is None + + def test_empty_history_returns_nones(self) -> None: + past, pct = market_aggregator.compute_trend_for_listing("[]", 2500) + assert past is None and pct is None + + def test_malformed_history_returns_nones(self) -> None: + past, pct = market_aggregator.compute_trend_for_listing("not json", 2500) + assert past is None and pct is None + + def test_history_only_recent_returns_nones(self) -> None: + """History exists but no entry old enough.""" + now = datetime(2026, 5, 16, 12, 0, 0) + history = _hist([(now - timedelta(days=2), 2500)]) + past, pct = market_aggregator.compute_trend_for_listing( + history, 2500, lookback_days=14, now=now, + ) + assert past is None and pct is None + + def test_price_dropped(self) -> None: + now = datetime(2026, 5, 16, 12, 0, 0) + history = _hist([ + (now - timedelta(days=30), 2800), + (now - timedelta(days=20), 2700), + (now - timedelta(days=10), 2500), + ]) + # Lookback 14d → cutoff at day -14, latest entry on/before is day -20 (price 2700). + past, pct = market_aggregator.compute_trend_for_listing( + history, 2500, lookback_days=14, now=now, + ) + assert past == 2700 + assert pct == round((2500 - 2700) / 2700 * 100, 2) + assert pct < 0 + + def test_price_rose(self) -> None: + now = datetime(2026, 5, 16, 12, 0, 0) + history = _hist([(now - timedelta(days=20), 2000)]) + past, pct = market_aggregator.compute_trend_for_listing( + history, 2200, lookback_days=14, now=now, + ) + assert past == 2000 + assert pct == 10.0 + + def test_current_price_zero_returns_nones(self) -> None: + past, pct = market_aggregator.compute_trend_for_listing( + _hist([(datetime(2026, 1, 1), 2500)]), 0, + ) + assert past is None and pct is None + + +# --- _stats ---------------------------------------------------------------- + +class TestStats: + def test_empty(self) -> None: + out = market_aggregator._stats([]) + assert out == {"median": None, "mean": None, "count": 0} + + def test_filters_nonpositive(self) -> None: + out = market_aggregator._stats([0, -1, None, 2000, 3000]) # type: ignore[list-item] + assert out["count"] == 2 + assert out["median"] == 2500 + assert out["mean"] == 2500.0 + + def test_single_value(self) -> None: + out = market_aggregator._stats([1500]) + assert out == {"median": 1500.0, "mean": 1500.0, "count": 1} + + +# --- compute_aggregate_snapshot — integration on SQLite ------------------ + +@pytest.fixture +def engine_with_seed(): + """In-memory SQLite seeded with a tiny RentListing set in the 1-2 bed band.""" + engine = create_engine("sqlite://") + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + for i, (rooms, price, sqm) in enumerate( + [(1, 2000, 40), (1, 2500, 50), (2, 3000, 60), (2, 4000, 80), (3, 5000, 100)], + start=1, + ): + session.add( + RentListing( + id=i, price=price, number_of_bedrooms=rooms, + square_meters=sqm, longitude=0.0, latitude=0.0, + price_history_json="[]", listing_site=ListingSite.RIGHTMOVE, + last_seen=datetime(2026, 5, 16), + furnish_type=FurnishType.UNKNOWN, + ) + ) + session.commit() + return engine + + +class TestComputeAggregateSnapshot: + def test_writes_one_row_per_band(self, engine_with_seed) -> None: + rows = market_aggregator.compute_aggregate_snapshot( + engine_with_seed, + listing_types=("RENT",), + bedroom_bands=((1, 2),), + snapshot_date=datetime(2026, 5, 16), + ) + assert len(rows) == 1 + row = rows[0] + assert row.listing_count == 4 # excludes the 3-bed + assert row.median_total_price == 2750.0 # median of [2000,2500,3000,4000] + assert row.mean_total_price == 2875.0 + # qmprice values: 50, 50, 50, 50 → median 50 + assert row.median_qmprice == 50.0 + + def test_upsert_idempotent(self, engine_with_seed) -> None: + market_aggregator.compute_aggregate_snapshot( + engine_with_seed, + listing_types=("RENT",), + bedroom_bands=((1, 2),), + snapshot_date=datetime(2026, 5, 16), + ) + market_aggregator.compute_aggregate_snapshot( + engine_with_seed, + listing_types=("RENT",), + bedroom_bands=((1, 2),), + snapshot_date=datetime(2026, 5, 16), + ) + with Session(engine_with_seed) as session: + count = session.query(DailyListingAggregate).count() + assert count == 1 # no duplicate row diff --git a/ui_exporter.py b/ui_exporter.py index 179fba5..1fbbbf5 100644 --- a/ui_exporter.py +++ b/ui_exporter.py @@ -143,6 +143,10 @@ def convert_to_geojson_feature(listing: RentListing | BuyListing) -> dict[str, A "price_history": [item.to_dict() for item in listing.price_history], "agency": listing.agency, "available_from": property_info.get("letDateAvailable", None), + # Per-listing trend snapshot (populated by the daily aggregator — + # null until the aggregator has seen this listing at least once). + "price_14d_ago": listing.price_14d_ago, + "price_change_pct_14d": listing.price_change_pct_14d, } if isinstance(listing, BuyListing):