import asyncio import json import logging import os import subprocess import sys from pathlib import Path import click import uvicorn from payslip_ingest.db import create_engine_from_env, make_session_factory from payslip_ingest.extractor import ClaudeExtractor from payslip_ingest.paperless import PaperlessClient from payslip_ingest.processor import process_document from payslip_ingest.schema import validate_totals log = logging.getLogger(__name__) @click.group() def cli() -> None: logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO")) @cli.command() def serve() -> None: """Run the webhook HTTP server (K8s entrypoint).""" uvicorn.run("payslip_ingest.app:app", host="0.0.0.0", port=8080) @cli.command() @click.option("--all", "process_all", is_flag=True, help="Process every payslip-tagged doc.") @click.option("--limit", type=int, default=None, help="Cap the number of documents processed.") @click.option("--tag", default="payslip", help="Paperless tag name to enumerate.") def backfill(process_all: bool, limit: int | None, tag: str) -> None: """Enumerate every payslip-tagged Paperless doc and process sequentially.""" if not process_all: raise click.UsageError("pass --all to opt in to the full enumeration") asyncio.run(_backfill(tag, limit)) async def _backfill(tag: str, limit: int | None) -> None: engine = create_engine_from_env() session_factory = make_session_factory(engine) paperless = PaperlessClient( base_url=os.environ["PAPERLESS_URL"], api_token=os.environ["PAPERLESS_API_TOKEN"], ) extractor = ClaudeExtractor( base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) processed = 0 failed = 0 try: async for doc in paperless.list_tagged_documents(tag): if limit is not None and processed >= limit: break doc_id = int(doc["id"]) try: result = await process_document(doc_id, session_factory, paperless, extractor) click.echo(f"doc_id={doc_id} status={result.status} validated={result.validated}") except Exception as exc: # Don't let a single bad doc (wrong tag, non-payslip PDF, Claude # hallucinating null fields) abort the whole backfill. Log + continue. failed += 1 click.echo(f"doc_id={doc_id} status=failed error={type(exc).__name__}: {exc}", err=True) log.exception("backfill: doc_id=%s failed", doc_id) processed += 1 click.echo(f"backfill complete: processed={processed} failed={failed}") finally: await paperless.aclose() await extractor.aclose() await engine.dispose() @cli.command("extract-one") @click.argument("path", type=click.Path(exists=True, dir_okay=False, path_type=Path)) def extract_one(path: Path) -> None: """Smoke-test extraction on a local PDF — no DB writes.""" asyncio.run(_extract_one(path)) async def _extract_one(path: Path) -> None: pdf_bytes = path.read_bytes() extractor = ClaudeExtractor( base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) try: extracted = await extractor.extract(pdf_bytes, {"id": None, "source": str(path)}) finally: await extractor.aclose() click.echo(extracted.model_dump_json(indent=2)) ok = validate_totals(extracted) click.echo(json.dumps({"totals_validated": ok})) if not ok: sys.exit(1) @cli.command() def migrate() -> None: """Run `alembic upgrade head`.""" result = subprocess.run(["alembic", "upgrade", "head"], check=False) sys.exit(result.returncode) if __name__ == "__main__": cli()