payslip-ingest/payslip_ingest/db.py

66 lines
3.3 KiB
Python
Raw Normal View History

import os
from datetime import date, datetime
from decimal import Decimal
from typing import Any
from sqlalchemy import JSON, TIMESTAMP, Boolean, Date, Integer, Numeric, String, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
SCHEMA_NAME = "payslip_ingest"
class Base(DeclarativeBase):
pass
# JSONB on Postgres, plain JSON (as text) on SQLite — tests use SQLite, prod uses Postgres.
JSON_TYPE = JSONB().with_variant(JSON(), "sqlite")
class Payslip(Base):
__tablename__ = "payslip"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
paperless_doc_id: Mapped[int] = mapped_column(Integer, unique=True, nullable=False)
pay_date: Mapped[date] = mapped_column(Date, nullable=False)
pay_period_start: Mapped[date | None] = mapped_column(Date, nullable=True)
pay_period_end: Mapped[date | None] = mapped_column(Date, nullable=True)
employer: Mapped[str | None] = mapped_column(String, nullable=True)
currency: Mapped[str] = mapped_column(String(3), nullable=False, server_default="GBP")
gross_pay: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
income_tax: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
national_insurance: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
pension_employee: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
pension_employer: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
student_loan: Mapped[Decimal] = mapped_column(Numeric(12, 2),
nullable=False,
server_default=text("0"))
other_deductions: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)
net_pay: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
tax_year: Mapped[str] = mapped_column(String, nullable=False)
raw_extraction: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False)
validated: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("true"))
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=text("now()"))
def create_engine_from_env() -> AsyncEngine:
url = os.environ["DB_CONNECTION_STRING"]
return create_async_engine(url, pool_pre_ping=True)
def make_session_factory(engine: AsyncEngine) -> async_sessionmaker[Any]:
return async_sessionmaker(engine, expire_on_commit=False)