wrongmove/crawler/config/schedule_config.py

122 lines
4 KiB
Python

"""Schedule configuration for periodic scraping tasks."""
from __future__ import annotations
import json
import logging
import os
import re
from typing import Self
from pydantic import BaseModel, field_validator
from models.listing import FurnishType, ListingType, QueryParameters
logger = logging.getLogger("uvicorn.error")
# Cron field validation patterns
CRON_MINUTE_PATTERN = re.compile(r"^(\*|([0-5]?\d)(,[0-5]?\d)*|\*/[1-9]\d*)$")
CRON_HOUR_PATTERN = re.compile(r"^(\*|(1?\d|2[0-3])(,(1?\d|2[0-3]))*|\*/[1-9]\d*)$")
CRON_DAY_OF_WEEK_PATTERN = re.compile(r"^(\*|[0-6](,[0-6])*|\*/[1-6])$")
class ScheduleConfig(BaseModel):
"""Configuration for a single periodic scrape schedule."""
name: str
enabled: bool = True
minute: str = "0"
hour: str = "2"
day_of_week: str = "*"
listing_type: ListingType
min_bedrooms: int = 1
max_bedrooms: int = 999
min_price: int = 0
max_price: int = 10_000_000
district_names: list[str] = []
furnish_types: list[str] | None = None
@field_validator("minute")
@classmethod
def validate_minute(cls, v: str) -> str:
"""Validate cron minute field (0-59, *, or */N)."""
if not CRON_MINUTE_PATTERN.match(v):
raise ValueError(
f"Invalid cron minute '{v}'. Must be 0-59, *, */N, or comma-separated values."
)
return v
@field_validator("hour")
@classmethod
def validate_hour(cls, v: str) -> str:
"""Validate cron hour field (0-23, *, or */N)."""
if not CRON_HOUR_PATTERN.match(v):
raise ValueError(
f"Invalid cron hour '{v}'. Must be 0-23, *, */N, or comma-separated values."
)
return v
@field_validator("day_of_week")
@classmethod
def validate_day_of_week(cls, v: str) -> str:
"""Validate cron day_of_week field (0-6, *, or */N)."""
if not CRON_DAY_OF_WEEK_PATTERN.match(v):
raise ValueError(
f"Invalid cron day_of_week '{v}'. Must be 0-6, *, */N, or comma-separated values."
)
return v
def to_query_parameters(self) -> QueryParameters:
"""Convert schedule config to QueryParameters for the scrape task."""
furnish_types_enum: list[FurnishType] | None = None
if self.furnish_types:
furnish_types_enum = [FurnishType(ft) for ft in self.furnish_types]
return QueryParameters(
listing_type=self.listing_type,
min_bedrooms=self.min_bedrooms,
max_bedrooms=self.max_bedrooms,
min_price=self.min_price,
max_price=self.max_price,
district_names=set(self.district_names),
furnish_types=furnish_types_enum,
)
class SchedulesConfig(BaseModel):
"""Container for multiple schedule configurations."""
schedules: list[ScheduleConfig] = []
@classmethod
def from_env(cls, env_var: str = "SCRAPE_SCHEDULES") -> Self:
"""Load schedules from environment variable.
Args:
env_var: Name of the environment variable containing JSON config.
Returns:
SchedulesConfig instance with parsed schedules.
Raises:
ValueError: If the JSON is invalid or schedule validation fails.
"""
raw_value = os.environ.get(env_var, "").strip()
if not raw_value:
logger.info(f"No {env_var} configured, no periodic scrapes will be scheduled")
return cls(schedules=[])
try:
parsed = json.loads(raw_value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {env_var}: {e}") from e
if not isinstance(parsed, list):
raise ValueError(f"{env_var} must be a JSON array")
schedules = [ScheduleConfig.model_validate(item) for item in parsed]
return cls(schedules=schedules)
def get_enabled_schedules(self) -> list[ScheduleConfig]:
"""Return only enabled schedules."""
return [s for s in self.schedules if s.enabled]