add worker api to refresh data in the background

This commit is contained in:
Viktor Barzin 2025-06-21 12:49:04 +00:00
parent fc722b6b5f
commit a7e0773c0a
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
12 changed files with 465 additions and 38 deletions

View file

@ -1,7 +1,12 @@
from pathlib import Path
import queue
from threading import Thread
from typing import Annotated
import uuid
from api.auth import get_current_user
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
from fastapi import Depends, FastAPI, Query
from api.worker import TaskStatus, dump_listings_worker, task_queue, task_results
from fastapi import Depends, FastAPI, HTTPException, Query
from api.auth import User
from models.listing import QueryParameters
from repositories.listing_repository import ListingRepository
@ -10,9 +15,11 @@ from database import engine
from fastapi.middleware.cors import CORSMiddleware
from ui_exporter import export_immoweb
app = FastAPI()
# Start worker thread
Thread(target=dump_listings_worker, daemon=True).start()
# Allow CORS (for React frontend)
app.add_middleware(
CORSMiddleware,
@ -39,3 +46,33 @@ async def get_listing_geojson(
repository, query_parameters=query_parameters, limit=None
)
return geojson_data
@app.post("/api/refresh_listings")
async def refresh_listings(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
) -> dict[str, str]:
# Submit processing task
task_id = str(uuid.uuid4())
task_results[task_id] = {"status": TaskStatus.QUEUED}
try:
task_queue.put_nowait(
(task_id, query_parameters),
)
except queue.Full:
raise HTTPException(
status_code=429,
detail="Already processing at maximum capacity. Please try again later",
)
return {"task_id": task_id}
@app.get("/api/task_status")
async def get_task_status(
user: Annotated[User, Depends(get_current_user)],
task_id: str,
) -> dict[str, str]:
if task_id not in task_results:
return {"status": "not_found"}
return task_results[task_id]

49
crawler/api/worker.py Normal file
View file

@ -0,0 +1,49 @@
import asyncio
import enum
import importlib
from pathlib import Path
from queue import Queue
from database import engine
from models.listing import Listing, QueryParameters
from pydantic import BaseModel
from repositories.listing_repository import ListingRepository
dump_listings_module = importlib.import_module("1_dump_listings")
# In-memory task queue and results store
task_queue = Queue(maxsize=1) # Disallow multiple in flight requests for now
task_results = {}
def dump_listings_worker() -> None:
return asyncio.run(_dump_listings_worker())
class TaskStatus(enum.StrEnum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
async def _dump_listings_worker() -> None: # global results is updated
"""Background worker that processes tasks"""
repository = ListingRepository(engine)
data_dir_path = Path("data/rs")
while True:
task_id, task_data = task_queue.get()
task_results[task_id] = {"status": TaskStatus.PROCESSING}
query_parameters = task_data
try:
new_listings = await dump_listings_module.dump_listings_full(
query_parameters, repository, data_dir_path
)
task_results[task_id] = {
"status": "completed",
"result": f"Fetched {len(new_listings)} new listings for query {task_data}",
}
except Exception as e:
task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)}
finally:
task_queue.task_done()

View file

@ -10,7 +10,9 @@
"dependencies": {
"@hookform/resolvers": "^5.1.1",
"@radix-ui/react-dialog": "^1.1.14",
"@radix-ui/react-hover-card": "^1.1.14",
"@radix-ui/react-label": "^2.1.7",
"@radix-ui/react-progress": "^1.1.7",
"@radix-ui/react-scroll-area": "^1.2.9",
"@radix-ui/react-select": "^2.2.5",
"@radix-ui/react-separator": "^1.1.7",
@ -1108,6 +1110,37 @@
}
}
},
"node_modules/@radix-ui/react-hover-card": {
"version": "1.1.14",
"resolved": "https://registry.npmjs.org/@radix-ui/react-hover-card/-/react-hover-card-1.1.14.tgz",
"integrity": "sha512-CPYZ24Mhirm+g6D8jArmLzjYu4Eyg3TTUHswR26QgzXBHBe64BO/RHOJKzmF/Dxb4y4f9PKyJdwm/O/AhNkb+Q==",
"license": "MIT",
"dependencies": {
"@radix-ui/primitive": "1.1.2",
"@radix-ui/react-compose-refs": "1.1.2",
"@radix-ui/react-context": "1.1.2",
"@radix-ui/react-dismissable-layer": "1.1.10",
"@radix-ui/react-popper": "1.2.7",
"@radix-ui/react-portal": "1.1.9",
"@radix-ui/react-presence": "1.1.4",
"@radix-ui/react-primitive": "2.1.3",
"@radix-ui/react-use-controllable-state": "1.2.2"
},
"peerDependencies": {
"@types/react": "*",
"@types/react-dom": "*",
"react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc",
"react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc"
},
"peerDependenciesMeta": {
"@types/react": {
"optional": true
},
"@types/react-dom": {
"optional": true
}
}
},
"node_modules/@radix-ui/react-id": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@radix-ui/react-id/-/react-id-1.1.1.tgz",
@ -1252,6 +1285,30 @@
}
}
},
"node_modules/@radix-ui/react-progress": {
"version": "1.1.7",
"resolved": "https://registry.npmjs.org/@radix-ui/react-progress/-/react-progress-1.1.7.tgz",
"integrity": "sha512-vPdg/tF6YC/ynuBIJlk1mm7Le0VgW6ub6J2UWnTQ7/D23KXcPI1qy+0vBkgKgd38RCMJavBXpB83HPNFMTb0Fg==",
"license": "MIT",
"dependencies": {
"@radix-ui/react-context": "1.1.2",
"@radix-ui/react-primitive": "2.1.3"
},
"peerDependencies": {
"@types/react": "*",
"@types/react-dom": "*",
"react": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc",
"react-dom": "^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc"
},
"peerDependenciesMeta": {
"@types/react": {
"optional": true
},
"@types/react-dom": {
"optional": true
}
}
},
"node_modules/@radix-ui/react-scroll-area": {
"version": "1.2.9",
"resolved": "https://registry.npmjs.org/@radix-ui/react-scroll-area/-/react-scroll-area-1.2.9.tgz",

View file

@ -12,7 +12,9 @@
"dependencies": {
"@hookform/resolvers": "^5.1.1",
"@radix-ui/react-dialog": "^1.1.14",
"@radix-ui/react-hover-card": "^1.1.14",
"@radix-ui/react-label": "^2.1.7",
"@radix-ui/react-progress": "^1.1.7",
"@radix-ui/react-scroll-area": "^1.2.9",
"@radix-ui/react-select": "^2.2.5",
"@radix-ui/react-separator": "^1.1.7",

View file

@ -3,6 +3,7 @@ import { useEffect, useState } from 'react';
import './App.css';
import { AppSidebar } from './AppSidebar';
import { getUser, handleCallback, logout } from './auth/authService';
import ActiveQuery from './components/ActiveQuery';
import LoginModal from './components/LoginModal';
import { Map } from './components/Map';
import { Parameters, type ParameterValues } from './components/Parameters';
@ -13,6 +14,7 @@ import { SidebarInset, SidebarProvider, SidebarTrigger } from './components/ui/s
function App() {
const [listingData, setListingData] = useState({});
const [taskID, setTaskID] = useState<string | null>(null);
const [user, setUser] = useState<User | null>(null);
useEffect(() => {
@ -29,9 +31,8 @@ function App() {
}, []);
const [isParametersModalOpen, setIsParametersModalOpen] = useState(true)
const [error, setError] = useState('')
const [queryParameters, setQueryParameters] = useState<ParameterValues | null>(null)
const fetchData = async (parameters: ParameterValues) => {
const fetchData = async (baseQueyrUri: string, parameters: ParameterValues, method: string = 'GET') => {
const accessToken = user?.access_token;
const queryString = new URLSearchParams();
queryString.append('listing_type', parameters.listing_type)
@ -51,34 +52,39 @@ function App() {
queryString.append("min_sqm", parameters.min_sqm.toString());
}
try {
const response = await fetch("/api/listing_geojson?" + queryString,
{
method: 'GET',
headers: {
'Authorization': `Bearer ${accessToken}`, // Pass the token
'Content-Type': 'application/json',
},
}
);
if (!response.ok) throw new Error('Error: ' + response.json());
const data: Response = await response.json();
return data;
} catch (err) {
setError('Failed to fetch data: ' + err);
alert(JSON.stringify(err))
} finally {
const response = await fetch(baseQueyrUri + '?' + queryString,
{
method: method,
headers: {
'Authorization': `Bearer ${accessToken}`, // Pass the token
'Content-Type': 'application/json',
},
}
);
if (!response.ok) {
throw new Error('Error: ' + response.status);
}
const data: Response = await response.json();
return data;
};
const onSubmit = async (parameters: ParameterValues) => {
const onSubmit = async (action: 'fetch-data' | 'visualize', parameters: ParameterValues) => {
// Fetch listing data
setQueryParameters(parameters)
const data = await fetchData(parameters);
console.log(data)
if (data) {
setListingData(data);
setIsParametersModalOpen(false)
let data = null;
if (action === 'visualize') {
data = await fetchData("/api/listing_geojson", parameters);
if (data) {
setListingData(data);
}
} else if (action === 'fetch-data') {
data = await fetchData("/api/refresh_listings", parameters, 'POST');
if (data) {
// @ts-expect-error
setTaskID(data.task_id)
}
}
console.log(data)
setIsParametersModalOpen(false)
}
@ -116,6 +122,7 @@ function App() {
<h1>Welcome, {user.profile.name}!</h1>
<Button onClick={logout}>Logout</Button>
<Parameters onSubmit={onSubmit} isOpen={isParametersModalOpen} />
<ActiveQuery taskID={taskID} />
</div>
{Object.keys(listingData).length > 0 &&
<div className="flex-1 w-full relative" style={{ minHeight: 0, marginBottom: '8rem' }}>

View file

@ -0,0 +1,138 @@
import { getUser } from '@/auth/authService';
import type { User } from 'oidc-client-ts';
import React, { useEffect, useState } from 'react';
import { HoverCard, HoverCardContent, HoverCardTrigger } from './ui/hover-card';
import { Progress } from './ui/progress';
interface ModalProps {
taskID: string | null;
}
const fetchTaskStatus = async (user: User, taskID: string) => {
const accessToken = user?.access_token;
const response = await fetch(`/api/task_status?task_id=${taskID}`, {
method: 'GET',
headers: {
'Authorization': `Bearer ${accessToken}`, // Pass the token
'Content-Type': 'application/json',
},
});
if (!response.ok) {
throw new Error(`Failed to fetch task status: ${response.status}`);
}
const data =
await response.json();
return data;
};
enum TaskStatus {
QUEUED = 'queued',
PROCESSING = 'processing',
COMPLETED = 'completed',
FAILED = 'failed',
}
const taskStatusToProgress = (taskStatus: TaskStatus): number => {
switch (taskStatus) {
case TaskStatus.QUEUED:
return 0.33; // Queued status
case TaskStatus.PROCESSING:
return 0.66; // Processing status
case TaskStatus.COMPLETED:
return 1.0; // Completed status
default:
throw new Error('Unknown task status: ' + status);
}
}
const getTaskStatus = (status: string): TaskStatus => {
switch (status.toLowerCase()) {
case 'queued':
return TaskStatus.QUEUED;
case 'processing':
return TaskStatus.PROCESSING;
case 'completed':
return TaskStatus.COMPLETED;
case 'failed':
return TaskStatus.FAILED;
default:
throw new Error('Unknown task status: ' + status);
}
};
const ActiveQuery: React.FC<ModalProps> = ({
taskID
}) => {
const [user, setUser] = useState<User | null>(null);
useEffect(() => {
getUser().then(setUser);
}, []);
const [progressPercentage, setProgressPercentage] = useState<number>(0);
const [taskStatus, setTaskStatus] = useState<TaskStatus | null>(TaskStatus.QUEUED);
const [lastUpdateTime, setLastUpdateTime] = useState<Date>(new Date());
// fetch status periodically
// maybe move to ws one day
useEffect(() => {
const interval = setInterval
(async () => {
if (!user || !taskID) {
return;
}
let data = null
try {
data = await fetchTaskStatus(user, taskID);
} catch (error: any) {
clearInterval(interval);
setTaskStatus(TaskStatus.FAILED)
alert(error)
}
if (!data) {
clearInterval(interval);
return;
}
setLastUpdateTime(new Date());
const taskStatus = getTaskStatus(data.status);
if (taskStatus === TaskStatus.FAILED) {
clearInterval(interval);
throw new Error('Task failed');
}
setTaskStatus(taskStatus);
const progress = taskStatusToProgress(taskStatus);
setProgressPercentage(progress * 100);
if (taskStatus === TaskStatus.COMPLETED) {
clearInterval(interval);
return;
}
}, 5000); // every 5 seconds
return () => clearInterval(interval);
}, [taskID]);
if (!taskID) {
return null;
}
return (
<>
<div>
<HoverCard>
<HoverCardTrigger>
{taskStatus && <p>Task status: {taskStatus} </p>}
<Progress value={progressPercentage} />
</HoverCardTrigger>
<HoverCardContent>
Task ID: {taskID}
<br />
Last updated: {lastUpdateTime.toLocaleString()}
</HoverCardContent>
</HoverCard>
</div>
</>
)
};
export default ActiveQuery;

View file

@ -1,4 +1,4 @@
// // @ts-nocheck
// @ts-nocheck
import crossfilter from "crossfilter2";
import * as d3 from "d3";
import mapboxgl from "mapbox-gl";
@ -195,7 +195,7 @@ export function Map(
.call(xAxis);
}
function openListingsDialog(longtitude, latitude) {
function openListingsDialog(longtitude: number, latitude: number) {
const searchBuffer = 0.001 // ~100m
const properties = heatmap._tree.search({
minX: longtitude - searchBuffer,

View file

@ -1,5 +1,6 @@
import { zodResolver } from "@hookform/resolvers/zod";
import { DialogTitle } from "@radix-ui/react-dialog";
import { useState } from "react";
import { useForm } from "react-hook-form";
import { z } from "zod";
import { Button } from "./ui/button";
@ -34,13 +35,13 @@ export interface ParameterValues {
export function Parameters(
props: {
isOpen: boolean,
onSubmit: (fromValues: ParameterValues) => void,
onSubmit: (action: 'fetch-data' | 'visualize', fromValues: ParameterValues) => void,
}
) {
const {
register,
} = useForm<ParameterValues>()
// const onSubmit: SubmitHandler<ParameterValues> = (data) => console.log(data)
const [action, setAction] = useState<'fetch-data' | 'visualize' | null>(null)
const formSchema = z.object({
metric: z.nativeEnum(Metric, { required_error: "Metric is required" }),
@ -48,7 +49,7 @@ export function Parameters(
min_bedrooms: z.number().min(1).max(10).optional(),
max_bedrooms: z.number().min(1).max(10).optional(),
max_price: z.number().optional(),
min_price: z.number().optional(),
min_price: z.number().min(0).optional(),
min_sqm: z.number().optional(),
})
const form = useForm<z.infer<typeof formSchema>>({
@ -58,7 +59,7 @@ export function Parameters(
min_bedrooms: 1,
max_bedrooms: 3,
max_price: 3000,
min_price: 0,
min_price: 2000,
min_sqm: 0,
},
})
@ -67,11 +68,15 @@ export function Parameters(
// Do something with the form values.
// ✅ This will be type-safe and validated.
console.log(values)
props.onSubmit(values)
if (action) {
props.onSubmit(action, values)
}
}
return <>
{/* <Dialog open={props.isOpen} > */}
<Dialog >
<DialogTrigger asChild>
<Button variant="outline">Open Parameters</Button>
@ -195,7 +200,8 @@ export function Parameters(
)}
/>
<Button type="submit">Submit</Button>
<Button type="submit" value={"visualize"} onClick={() => setAction("visualize")}>Visualize</Button>
<Button type="submit" value={"fetch-data"} onClick={() => setAction("fetch-data")}>Fetch data</Button>
</form>
</Form>
</DialogContent>

View file

@ -0,0 +1,46 @@
import * as React from "react"
import { Slot } from "@radix-ui/react-slot"
import { cva, type VariantProps } from "class-variance-authority"
import { cn } from "@/lib/utils"
const badgeVariants = cva(
"inline-flex items-center justify-center rounded-md border px-2 py-0.5 text-xs font-medium w-fit whitespace-nowrap shrink-0 [&>svg]:size-3 gap-1 [&>svg]:pointer-events-none focus-visible:border-ring focus-visible:ring-ring/50 focus-visible:ring-[3px] aria-invalid:ring-destructive/20 dark:aria-invalid:ring-destructive/40 aria-invalid:border-destructive transition-[color,box-shadow] overflow-hidden",
{
variants: {
variant: {
default:
"border-transparent bg-primary text-primary-foreground [a&]:hover:bg-primary/90",
secondary:
"border-transparent bg-secondary text-secondary-foreground [a&]:hover:bg-secondary/90",
destructive:
"border-transparent bg-destructive text-white [a&]:hover:bg-destructive/90 focus-visible:ring-destructive/20 dark:focus-visible:ring-destructive/40 dark:bg-destructive/60",
outline:
"text-foreground [a&]:hover:bg-accent [a&]:hover:text-accent-foreground",
},
},
defaultVariants: {
variant: "default",
},
}
)
function Badge({
className,
variant,
asChild = false,
...props
}: React.ComponentProps<"span"> &
VariantProps<typeof badgeVariants> & { asChild?: boolean }) {
const Comp = asChild ? Slot : "span"
return (
<Comp
data-slot="badge"
className={cn(badgeVariants({ variant }), className)}
{...props}
/>
)
}
export { Badge, badgeVariants }

View file

@ -0,0 +1,42 @@
import * as React from "react"
import * as HoverCardPrimitive from "@radix-ui/react-hover-card"
import { cn } from "@/lib/utils"
function HoverCard({
...props
}: React.ComponentProps<typeof HoverCardPrimitive.Root>) {
return <HoverCardPrimitive.Root data-slot="hover-card" {...props} />
}
function HoverCardTrigger({
...props
}: React.ComponentProps<typeof HoverCardPrimitive.Trigger>) {
return (
<HoverCardPrimitive.Trigger data-slot="hover-card-trigger" {...props} />
)
}
function HoverCardContent({
className,
align = "center",
sideOffset = 4,
...props
}: React.ComponentProps<typeof HoverCardPrimitive.Content>) {
return (
<HoverCardPrimitive.Portal data-slot="hover-card-portal">
<HoverCardPrimitive.Content
data-slot="hover-card-content"
align={align}
sideOffset={sideOffset}
className={cn(
"bg-popover text-popover-foreground data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 data-[side=bottom]:slide-in-from-top-2 data-[side=left]:slide-in-from-right-2 data-[side=right]:slide-in-from-left-2 data-[side=top]:slide-in-from-bottom-2 z-50 w-64 origin-(--radix-hover-card-content-transform-origin) rounded-md border p-4 shadow-md outline-hidden",
className
)}
{...props}
/>
</HoverCardPrimitive.Portal>
)
}
export { HoverCard, HoverCardTrigger, HoverCardContent }

View file

@ -0,0 +1,29 @@
import * as React from "react"
import * as ProgressPrimitive from "@radix-ui/react-progress"
import { cn } from "@/lib/utils"
function Progress({
className,
value,
...props
}: React.ComponentProps<typeof ProgressPrimitive.Root>) {
return (
<ProgressPrimitive.Root
data-slot="progress"
className={cn(
"bg-primary/20 relative h-2 w-full overflow-hidden rounded-full",
className
)}
{...props}
>
<ProgressPrimitive.Indicator
data-slot="progress-indicator"
className="bg-primary h-full w-full flex-1 transition-all"
style={{ transform: `translateX(-${100 - (value || 0)}%)` }}
/>
</ProgressPrimitive.Root>
)
}
export { Progress }

View file

@ -122,9 +122,11 @@ def cli(ctx, data_dir: str):
@cli.command()
@listing_filter_options
@click.option("--full", is_flag=True)
@click.pass_context
def dump_listings(
ctx: click.core.Context,
full: bool,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
@ -148,6 +150,9 @@ def dump_listings(
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
radius=0,
page_size=500,
max_days_since_added=14,
)
click.echo(
f"Running dump_listings for districts {district}, data dir {data_dir} and parameters: "
@ -155,9 +160,18 @@ def dump_listings(
)
data_dir_path = pathlib.Path(data_dir)
repository = ListingRepository(engine=engine)
asyncio.run(
dump_listings_module.dump_listings(query_parameters, repository, data_dir_path)
)
if not full: # only listings
asyncio.run(
dump_listings_module.dump_listings(
query_parameters, repository, data_dir_path
)
)
else: # include images, floorplan detection etc.
asyncio.run(
dump_listings_module.dump_listings_full(
query_parameters, repository, data_dir_path
)
)
@cli.command()