- Rewrite csv_exporter.py to use stdlib csv.DictWriter instead of pandas DataFrame - Rewrite notifications.py to use aiohttp direct Slack webhook instead of apprise - Switch opencv-python to opencv-python-headless in pyproject.toml - Move httpx from dev to prod dependencies - Remove pandas and apprise from mypy ignore_missing_imports
62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
import csv
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from models.listing import QueryParameters
|
|
from repositories.listing_repository import ListingRepository
|
|
|
|
|
|
DROP_COLUMNS = {"_sa_instance_state", "additional_info"}
|
|
ENSURE_COLUMNS = ("service_charge", "lease_left", "square_meters")
|
|
|
|
|
|
async def export_to_csv(
|
|
repository: ListingRepository,
|
|
output_file: Path,
|
|
query_parameters: QueryParameters | None = None,
|
|
) -> None:
|
|
listings = await repository.get_listings(query_parameters=query_parameters)
|
|
rows = [
|
|
{k: v for k, v in listing.__dict__.items() if k not in DROP_COLUMNS}
|
|
for listing in listings
|
|
]
|
|
|
|
if not rows:
|
|
output_file.write_text("")
|
|
return
|
|
|
|
# Read decisions file if present
|
|
decisions: dict[str, str] = {}
|
|
decisions_path = Path("data/decisions.json")
|
|
if decisions_path.exists():
|
|
with open(decisions_path) as f:
|
|
decisions = json.load(f)
|
|
|
|
for row in rows:
|
|
# Add decision column
|
|
row["decision"] = decisions.get(str(row.get("id")))
|
|
|
|
# Ensure optional columns exist
|
|
for col in ENSURE_COLUMNS:
|
|
row.setdefault(col, None)
|
|
|
|
# Replace -1 sentinel in square_meters
|
|
if row.get("square_meters") == -1:
|
|
row["square_meters"] = None
|
|
|
|
# Compute price_per_sqm
|
|
sqm = row.get("square_meters")
|
|
price = row.get("price")
|
|
if sqm and sqm > 0 and price:
|
|
row["price_per_sqm"] = round(price / sqm, 2)
|
|
else:
|
|
row["price_per_sqm"] = None
|
|
|
|
# Sort by price_per_sqm (None values last)
|
|
rows.sort(key=lambda r: (r["price_per_sqm"] is None, r["price_per_sqm"] or 0))
|
|
|
|
fieldnames = list(rows[0].keys())
|
|
with open(output_file, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|