- Replace deprecated datetime.utcnow() with datetime.now(UTC) in model and repository - Add listing_type validation to decision_service (RENT/BUY only) - Fix decision filtering tests failing due to rate limiting by patching _match_endpoint - Add SwipeCard component test suite (11 tests covering rendering, interactions, and POI distances) - Add test for invalid listing_type validation
75 lines
2.7 KiB
Python
75 lines
2.7 KiB
Python
"""Repository for listing decisions (like/dislike)."""
|
|
from datetime import datetime, UTC
|
|
|
|
from models.decision import ListingDecision
|
|
from sqlalchemy import Engine
|
|
from sqlmodel import Session, select
|
|
|
|
|
|
class DecisionRepository:
|
|
engine: Engine
|
|
|
|
def __init__(self, engine: Engine) -> None:
|
|
self.engine = engine
|
|
|
|
def upsert_decision(
|
|
self, user_id: int, listing_id: int, listing_type: str, decision: str
|
|
) -> ListingDecision:
|
|
with Session(self.engine) as session:
|
|
statement = select(ListingDecision).where(
|
|
ListingDecision.user_id == user_id,
|
|
ListingDecision.listing_id == listing_id,
|
|
ListingDecision.listing_type == listing_type,
|
|
)
|
|
existing = session.exec(statement).first()
|
|
if existing:
|
|
existing.decision = decision
|
|
existing.updated_at = datetime.now(UTC)
|
|
session.add(existing)
|
|
session.commit()
|
|
session.refresh(existing)
|
|
return existing
|
|
new_decision = ListingDecision(
|
|
user_id=user_id,
|
|
listing_id=listing_id,
|
|
listing_type=listing_type,
|
|
decision=decision,
|
|
)
|
|
session.add(new_decision)
|
|
session.commit()
|
|
session.refresh(new_decision)
|
|
return new_decision
|
|
|
|
def get_decisions_for_user(self, user_id: int) -> list[ListingDecision]:
|
|
with Session(self.engine) as session:
|
|
statement = select(ListingDecision).where(
|
|
ListingDecision.user_id == user_id
|
|
)
|
|
return list(session.exec(statement).all())
|
|
|
|
def delete_decision(
|
|
self, user_id: int, listing_id: int, listing_type: str
|
|
) -> bool:
|
|
with Session(self.engine) as session:
|
|
statement = select(ListingDecision).where(
|
|
ListingDecision.user_id == user_id,
|
|
ListingDecision.listing_id == listing_id,
|
|
ListingDecision.listing_type == listing_type,
|
|
)
|
|
existing = session.exec(statement).first()
|
|
if existing is None:
|
|
return False
|
|
session.delete(existing)
|
|
session.commit()
|
|
return True
|
|
|
|
def get_disliked_listing_ids(
|
|
self, user_id: int, listing_type: str
|
|
) -> set[int]:
|
|
with Session(self.engine) as session:
|
|
statement = select(ListingDecision.listing_id).where(
|
|
ListingDecision.user_id == user_id,
|
|
ListingDecision.listing_type == listing_type,
|
|
ListingDecision.decision == "disliked",
|
|
)
|
|
return {row for row in session.exec(statement).all()}
|