from collections.abc import AsyncIterator from typing import Any import httpx class PaperlessError(RuntimeError): pass class PaperlessClient: """Async client for Paperless-ngx REST API. Auth uses a long-lived API token: Authorization: Token . """ def __init__(self, base_url: str, api_token: str, client: httpx.AsyncClient | None = None): self._base_url = base_url.rstrip("/") self._headers = {"Authorization": f"Token {api_token}"} self._client = client or httpx.AsyncClient(timeout=60.0) self._owns_client = client is None async def aclose(self) -> None: if self._owns_client: await self._client.aclose() async def __aenter__(self) -> "PaperlessClient": return self async def __aexit__(self, *exc: object) -> None: await self.aclose() async def get_document(self, doc_id: int) -> dict[str, Any]: resp = await self._client.get(f"{self._base_url}/api/documents/{doc_id}/", headers=self._headers) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise PaperlessError(f"Unexpected document payload for {doc_id}: {type(data)}") return data async def download_document(self, doc_id: int) -> bytes: resp = await self._client.get(f"{self._base_url}/api/documents/{doc_id}/download/", headers=self._headers) resp.raise_for_status() return resp.content async def get_tag_id(self, tag_name: str) -> int: resp = await self._client.get( f"{self._base_url}/api/tags/", headers=self._headers, params={"name__iexact": tag_name}, ) resp.raise_for_status() results = resp.json().get("results", []) if len(results) == 0: raise PaperlessError(f"No tag named {tag_name!r}") if len(results) > 1: raise PaperlessError(f"Multiple tags matched {tag_name!r}: {len(results)}") tag_id = results[0]["id"] if not isinstance(tag_id, int): raise PaperlessError(f"Tag id is not int: {tag_id!r}") return tag_id async def list_tagged_documents(self, tag_name: str) -> AsyncIterator[dict[str, Any]]: tag_id = await self.get_tag_id(tag_name) next_url: str | None = f"{self._base_url}/api/documents/?tags__id={tag_id}" while next_url: resp = await self._client.get(next_url, headers=self._headers) resp.raise_for_status() page = resp.json() for item in page.get("results", []): yield item next_url = page.get("next")