122 lines
4 KiB
Python
122 lines
4 KiB
Python
"""Schedule configuration for periodic scraping tasks."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from typing import Self
|
|
|
|
from pydantic import BaseModel, field_validator
|
|
|
|
from models.listing import FurnishType, ListingType, QueryParameters
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
# Cron field validation patterns
|
|
CRON_MINUTE_PATTERN = re.compile(r"^(\*|([0-5]?\d)(,[0-5]?\d)*|\*/[1-9]\d*)$")
|
|
CRON_HOUR_PATTERN = re.compile(r"^(\*|(1?\d|2[0-3])(,(1?\d|2[0-3]))*|\*/[1-9]\d*)$")
|
|
CRON_DAY_OF_WEEK_PATTERN = re.compile(r"^(\*|[0-6](,[0-6])*|\*/[1-6])$")
|
|
|
|
|
|
class ScheduleConfig(BaseModel):
|
|
"""Configuration for a single periodic scrape schedule."""
|
|
|
|
name: str
|
|
enabled: bool = True
|
|
minute: str = "0"
|
|
hour: str = "2"
|
|
day_of_week: str = "*"
|
|
listing_type: ListingType
|
|
min_bedrooms: int = 1
|
|
max_bedrooms: int = 999
|
|
min_price: int = 0
|
|
max_price: int = 10_000_000
|
|
district_names: list[str] = []
|
|
furnish_types: list[str] | None = None
|
|
|
|
@field_validator("minute")
|
|
@classmethod
|
|
def validate_minute(cls, v: str) -> str:
|
|
"""Validate cron minute field (0-59, *, or */N)."""
|
|
if not CRON_MINUTE_PATTERN.match(v):
|
|
raise ValueError(
|
|
f"Invalid cron minute '{v}'. Must be 0-59, *, */N, or comma-separated values."
|
|
)
|
|
return v
|
|
|
|
@field_validator("hour")
|
|
@classmethod
|
|
def validate_hour(cls, v: str) -> str:
|
|
"""Validate cron hour field (0-23, *, or */N)."""
|
|
if not CRON_HOUR_PATTERN.match(v):
|
|
raise ValueError(
|
|
f"Invalid cron hour '{v}'. Must be 0-23, *, */N, or comma-separated values."
|
|
)
|
|
return v
|
|
|
|
@field_validator("day_of_week")
|
|
@classmethod
|
|
def validate_day_of_week(cls, v: str) -> str:
|
|
"""Validate cron day_of_week field (0-6, *, or */N)."""
|
|
if not CRON_DAY_OF_WEEK_PATTERN.match(v):
|
|
raise ValueError(
|
|
f"Invalid cron day_of_week '{v}'. Must be 0-6, *, */N, or comma-separated values."
|
|
)
|
|
return v
|
|
|
|
def to_query_parameters(self) -> QueryParameters:
|
|
"""Convert schedule config to QueryParameters for the scrape task."""
|
|
furnish_types_enum: list[FurnishType] | None = None
|
|
if self.furnish_types:
|
|
furnish_types_enum = [FurnishType(ft) for ft in self.furnish_types]
|
|
|
|
return QueryParameters(
|
|
listing_type=self.listing_type,
|
|
min_bedrooms=self.min_bedrooms,
|
|
max_bedrooms=self.max_bedrooms,
|
|
min_price=self.min_price,
|
|
max_price=self.max_price,
|
|
district_names=set(self.district_names),
|
|
furnish_types=furnish_types_enum,
|
|
)
|
|
|
|
|
|
class SchedulesConfig(BaseModel):
|
|
"""Container for multiple schedule configurations."""
|
|
|
|
schedules: list[ScheduleConfig] = []
|
|
|
|
@classmethod
|
|
def from_env(cls, env_var: str = "SCRAPE_SCHEDULES") -> Self:
|
|
"""Load schedules from environment variable.
|
|
|
|
Args:
|
|
env_var: Name of the environment variable containing JSON config.
|
|
|
|
Returns:
|
|
SchedulesConfig instance with parsed schedules.
|
|
|
|
Raises:
|
|
ValueError: If the JSON is invalid or schedule validation fails.
|
|
"""
|
|
raw_value = os.environ.get(env_var, "").strip()
|
|
|
|
if not raw_value:
|
|
logger.info(f"No {env_var} configured, no periodic scrapes will be scheduled")
|
|
return cls(schedules=[])
|
|
|
|
try:
|
|
parsed = json.loads(raw_value)
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON in {env_var}: {e}") from e
|
|
|
|
if not isinstance(parsed, list):
|
|
raise ValueError(f"{env_var} must be a JSON array")
|
|
|
|
schedules = [ScheduleConfig.model_validate(item) for item in parsed]
|
|
return cls(schedules=schedules)
|
|
|
|
def get_enabled_schedules(self) -> list[ScheduleConfig]:
|
|
"""Return only enabled schedules."""
|
|
return [s for s in self.schedules if s.enabled]
|