wrongmove/repositories/decision_repository.py
Viktor Barzin 9e1beb7495
Add listing decisions (like/dislike) backend with detail endpoint
- ListingDecision model with unique constraint on (user_id, listing_id, listing_type)
- Alembic migration for listingdecision table
- DecisionRepository with dialect-aware upsert (MySQL/SQLite)
- DecisionService with input validation
- Decision API routes: PUT/GET/DELETE on /api/decisions
- GET /api/listing/{id}/detail endpoint extracting full property info from additional_info
- Add listing ID to GeoJSON feature properties
- Decision filtering on GeoJSON stream endpoint (decision_filter param)
2026-02-21 15:49:10 +00:00

114 lines
4 KiB
Python

from datetime import datetime
from models.decision import ListingDecision
from sqlalchemy import Engine
from sqlmodel import Session, select
class DecisionRepository:
engine: Engine
def __init__(self, engine: Engine) -> None:
self.engine = engine
def upsert_decision(
self,
user_id: int,
listing_id: int,
listing_type: str,
decision: str,
) -> ListingDecision:
"""Create or update a decision. Uses dialect-specific upsert."""
with Session(self.engine) as session:
now = datetime.utcnow()
values = {
"user_id": user_id,
"listing_id": listing_id,
"listing_type": listing_type,
"decision": decision,
"created_at": now,
"updated_at": now,
}
dialect = self.engine.dialect.name
if dialect == "mysql":
from sqlalchemy.dialects.mysql import insert as mysql_insert
stmt = mysql_insert(ListingDecision).values(**values)
stmt = stmt.on_duplicate_key_update(
decision=stmt.inserted.decision,
updated_at=stmt.inserted.updated_at,
)
else:
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
stmt = sqlite_insert(ListingDecision).values(**values)
stmt = stmt.on_conflict_do_update(
index_elements=["user_id", "listing_id", "listing_type"],
set_={
"decision": stmt.excluded.decision,
"updated_at": stmt.excluded.updated_at,
},
)
session.execute(stmt)
session.commit()
# Fetch the result
result = session.exec(
select(ListingDecision).where(
ListingDecision.user_id == user_id,
ListingDecision.listing_id == listing_id,
ListingDecision.listing_type == listing_type,
)
).first()
assert result is not None
return result
def get_decisions_for_user(self, user_id: int) -> list[ListingDecision]:
with Session(self.engine) as session:
statement = select(ListingDecision).where(
ListingDecision.user_id == user_id
)
return list(session.exec(statement).all())
def delete_decision(
self,
user_id: int,
listing_id: int,
listing_type: str,
) -> bool:
with Session(self.engine) as session:
result = session.exec(
select(ListingDecision).where(
ListingDecision.user_id == user_id,
ListingDecision.listing_id == listing_id,
ListingDecision.listing_type == listing_type,
)
).first()
if result is None:
return False
session.delete(result)
session.commit()
return True
def get_disliked_listing_ids(
self,
user_id: int,
listing_type: str,
) -> set[int]:
with Session(self.engine) as session:
statement = select(ListingDecision.listing_id).where(
ListingDecision.user_id == user_id,
ListingDecision.listing_type == listing_type,
ListingDecision.decision == "disliked",
)
return {row for row in session.exec(statement).all()}
def get_liked_listing_ids(
self,
user_id: int,
listing_type: str,
) -> set[int]:
with Session(self.engine) as session:
statement = select(ListingDecision.listing_id).where(
ListingDecision.user_id == user_id,
ListingDecision.listing_type == listing_type,
ListingDecision.decision == "liked",
)
return {row for row in session.exec(statement).all()}