payslip-ingest/payslip_ingest/__main__.py
Viktor Barzin 57484619c1 Initial commit: event-driven UK payslip ingest service
Extracted from /home/wizard/code monorepo into its own repo so Woodpecker CI
can watch it. Identical content to /home/wizard/code commit e426028.

See README.md for overview, env vars, and Paperless workflow config.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 22:10:23 +00:00

101 lines
3.2 KiB
Python

import asyncio
import json
import logging
import os
import subprocess
import sys
from pathlib import Path
import click
import uvicorn
from payslip_ingest.db import create_engine_from_env, make_session_factory
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.processor import process_document
from payslip_ingest.schema import validate_totals
log = logging.getLogger(__name__)
@click.group()
def cli() -> None:
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
@cli.command()
def serve() -> None:
"""Run the webhook HTTP server (K8s entrypoint)."""
uvicorn.run("payslip_ingest.app:app", host="0.0.0.0", port=8080)
@cli.command()
@click.option("--all", "process_all", is_flag=True, help="Process every payslip-tagged doc.")
@click.option("--limit", type=int, default=None, help="Cap the number of documents processed.")
@click.option("--tag", default="payslip", help="Paperless tag name to enumerate.")
def backfill(process_all: bool, limit: int | None, tag: str) -> None:
"""Enumerate every payslip-tagged Paperless doc and process sequentially."""
if not process_all:
raise click.UsageError("pass --all to opt in to the full enumeration")
asyncio.run(_backfill(tag, limit))
async def _backfill(tag: str, limit: int | None) -> None:
engine = create_engine_from_env()
session_factory = make_session_factory(engine)
paperless = PaperlessClient(
base_url=os.environ["PAPERLESS_URL"],
api_token=os.environ["PAPERLESS_API_TOKEN"],
)
extractor = ClaudeExtractor(
base_url=os.environ["CLAUDE_AGENT_URL"],
bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"],
)
processed = 0
try:
async for doc in paperless.list_tagged_documents(tag):
if limit is not None and processed >= limit:
break
doc_id = int(doc["id"])
result = await process_document(doc_id, session_factory, paperless, extractor)
click.echo(f"doc_id={doc_id} status={result.status} validated={result.validated}")
processed += 1
finally:
await paperless.aclose()
await extractor.aclose()
await engine.dispose()
@cli.command("extract-one")
@click.argument("path", type=click.Path(exists=True, dir_okay=False, path_type=Path))
def extract_one(path: Path) -> None:
"""Smoke-test extraction on a local PDF — no DB writes."""
asyncio.run(_extract_one(path))
async def _extract_one(path: Path) -> None:
pdf_bytes = path.read_bytes()
extractor = ClaudeExtractor(
base_url=os.environ["CLAUDE_AGENT_URL"],
bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"],
)
try:
extracted = await extractor.extract(pdf_bytes, {"id": None, "source": str(path)})
finally:
await extractor.aclose()
click.echo(extracted.model_dump_json(indent=2))
ok = validate_totals(extracted)
click.echo(json.dumps({"totals_validated": ok}))
if not ok:
sys.exit(1)
@cli.command()
def migrate() -> None:
"""Run `alembic upgrade head`."""
result = subprocess.run(["alembic", "upgrade", "head"], check=False)
sys.exit(result.returncode)
if __name__ == "__main__":
cli()