import asyncio import json import logging import os import subprocess import sys from pathlib import Path import click import uvicorn from payslip_ingest.db import create_engine_from_env, make_session_factory from payslip_ingest.extractor import ClaudeExtractor from payslip_ingest.paperless import PaperlessClient from payslip_ingest.processor import process_document from payslip_ingest.schema import validate_totals log = logging.getLogger(__name__) @click.group() def cli() -> None: logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO")) @cli.command() def serve() -> None: """Run the webhook HTTP server (K8s entrypoint).""" uvicorn.run("payslip_ingest.app:app", host="0.0.0.0", port=8080) @cli.command() @click.option("--all", "process_all", is_flag=True, help="Process every payslip-tagged doc.") @click.option("--limit", type=int, default=None, help="Cap the number of documents processed.") @click.option("--tag", default="payslip", help="Paperless tag name to enumerate.") def backfill(process_all: bool, limit: int | None, tag: str) -> None: """Enumerate every payslip-tagged Paperless doc and process sequentially.""" if not process_all: raise click.UsageError("pass --all to opt in to the full enumeration") asyncio.run(_backfill(tag, limit)) async def _backfill(tag: str, limit: int | None) -> None: engine = create_engine_from_env() session_factory = make_session_factory(engine) paperless = PaperlessClient( base_url=os.environ["PAPERLESS_URL"], api_token=os.environ["PAPERLESS_API_TOKEN"], ) extractor = ClaudeExtractor( base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) # Resolve the P60 tag if present — needed for the dispatch branch even # when backfilling a non-p60 tag (a P60-tagged doc carrying the payslip # tag too should still route to the P60 handler). p60_tag_id: int | None = None try: p60_tag_id = await paperless.get_tag_id("p60") except Exception as exc: click.echo(f"warning: p60 tag resolution failed — dispatch disabled: {exc}", err=True) processed = 0 failed = 0 try: async for doc in paperless.list_tagged_documents(tag): if limit is not None and processed >= limit: break doc_id = int(doc["id"]) try: result = await process_document(doc_id, session_factory, paperless, extractor, p60_tag_id) click.echo(f"doc_id={doc_id} status={result.status} validated={result.validated}") except Exception as exc: # Don't let a single bad doc (wrong tag, non-payslip PDF, Claude # hallucinating null fields) abort the whole backfill. Log + continue. failed += 1 click.echo(f"doc_id={doc_id} status=failed error={type(exc).__name__}: {exc}", err=True) log.exception("backfill: doc_id=%s failed", doc_id) processed += 1 click.echo(f"backfill complete: processed={processed} failed={failed}") finally: await paperless.aclose() await extractor.aclose() await engine.dispose() @cli.command("extract-one") @click.argument("path", type=click.Path(exists=True, dir_okay=False, path_type=Path)) def extract_one(path: Path) -> None: """Smoke-test extraction on a local PDF — no DB writes.""" asyncio.run(_extract_one(path)) async def _extract_one(path: Path) -> None: pdf_bytes = path.read_bytes() extractor = ClaudeExtractor( base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) try: extracted = await extractor.extract(pdf_bytes, {"id": None, "source": str(path)}) finally: await extractor.aclose() click.echo(extracted.model_dump_json(indent=2)) ok = validate_totals(extracted) click.echo(json.dumps({"totals_validated": ok})) if not ok: sys.exit(1) @cli.command() def migrate() -> None: """Run `alembic upgrade head`.""" result = subprocess.run(["alembic", "upgrade", "head"], check=False) sys.exit(result.returncode) @cli.command("sync-meta-deposits") def sync_meta_deposits_cmd() -> None: """Pull Meta payroll deposits from ActualBudget into external_meta_deposits. Reads from the jhonderson/actual-http-api sidecar. Requires env vars: ACTUALBUDGET_HTTP_API_URL, ACTUALBUDGET_API_KEY, ACTUALBUDGET_ENCRYPTION_PASSWORD, ACTUALBUDGET_BUDGET_SYNC_ID. """ asyncio.run(_sync_meta_deposits()) async def _sync_meta_deposits() -> None: from payslip_ingest.sync.actualbudget import ActualBudgetClient, sync_meta_deposits engine = create_engine_from_env() session_factory = make_session_factory(engine) client = ActualBudgetClient( base_url=os.environ["ACTUALBUDGET_HTTP_API_URL"], api_key=os.environ["ACTUALBUDGET_API_KEY"], encryption_password=os.environ["ACTUALBUDGET_ENCRYPTION_PASSWORD"], budget_sync_id=os.environ["ACTUALBUDGET_BUDGET_SYNC_ID"], ) try: result = await sync_meta_deposits(client, session_factory) click.echo(f"sync complete: accounts={result.accounts_scanned} " f"transactions={result.transactions_fetched} " f"meta_matched={result.meta_deposits_matched} " f"inserted={result.inserted} existing={result.skipped_existing}") finally: await client.aclose() await engine.dispose() @cli.command("backfill-cash-tax") @click.option("--limit", type=int, default=None, help="Cap the number of rows processed.") def backfill_cash_tax(limit: int | None) -> None: """Back-fill cash_income_tax on rows where it's NULL (vest months only). Uses the widened regex parser first; falls back to Claude. Writes the provenance source into `cash_income_tax_source`. Idempotent — only touches NULL rows. """ asyncio.run(_backfill_cash_tax(limit)) async def _backfill_cash_tax(limit: int | None) -> None: from payslip_ingest.backfill_cash_tax import backfill_cash_income_tax engine = create_engine_from_env() session_factory = make_session_factory(engine) paperless = PaperlessClient( base_url=os.environ["PAPERLESS_URL"], api_token=os.environ["PAPERLESS_API_TOKEN"], ) extractor = ClaudeExtractor( base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) try: result = await backfill_cash_income_tax(session_factory, paperless, extractor, limit=limit) click.echo(f"back-fill complete: processed={result.processed} " f"regex={result.regex_hits} claude={result.claude_hits} " f"fallback_null={result.fallback_null} errors={result.errors}") finally: await paperless.aclose() await extractor.aclose() await engine.dispose() if __name__ == "__main__": cli()