From 9d308c36dc3391e81fb09fdbcf428783fbc37b01 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Tue, 2 Jun 2026 13:54:15 +0000 Subject: [PATCH] examples: serialize LLM calls via Semaphore (default 1) to dodge CAS busy-lock --- fire_planner/examples/cli.py | 40 +++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/fire_planner/examples/cli.py b/fire_planner/examples/cli.py index 19ded31..ed50bd7 100644 --- a/fire_planner/examples/cli.py +++ b/fire_planner/examples/cli.py @@ -52,20 +52,38 @@ async def ingest_subreddit( claude_bearer: str, client: httpx.AsyncClient, fx_rates: dict[str, Decimal], + llm_semaphore: asyncio.Semaphore | None = None, ) -> tuple[int, int]: + """Yield (inserted, skipped) counts for one (sub, when) bucket. + + `llm_semaphore` serializes the LLM call across parallel sub-runs; + claude-agent-service's `/v1/chat/completions` has a single-flight + busy-lock, so 12 concurrent fan-outs trample each other. Default + None = no serialization (test path). + """ inserted = 0 skipped = 0 async for post in fetch_top(reddit, sub, when, limit=limit): if not is_candidate(post): skipped += 1 continue - extracted = await extract_with_fallback( - post, - llama_url=llama_url, - claude_url=claude_url, - claude_bearer=claude_bearer, - client=client, - ) + if llm_semaphore is not None: + async with llm_semaphore: + extracted = await extract_with_fallback( + post, + llama_url=llama_url, + claude_url=claude_url, + claude_bearer=claude_bearer, + client=client, + ) + else: + extracted = await extract_with_fallback( + post, + llama_url=llama_url, + claude_url=claude_url, + claude_bearer=claude_bearer, + client=client, + ) if extracted is None: log.info("dropping %s — both LLM tiers failed", post.reddit_id) skipped += 1 @@ -96,6 +114,13 @@ async def _ingest_all( claude_url = os.environ["CLAUDE_AGENT_SERVICE_URL"] claude_bearer = os.environ["CLAUDE_AGENT_BEARER"] + # Cap concurrent LLM calls across all sub-runs. claude-agent-service's + # /v1/chat/completions busy-locks (single-flight), so 12-sub fan-out + # otherwise loses 11 calls to 503. Default 1 = strict serial. Bump + # to 2-3 once the busy-lock is dropped for the chat endpoint. + llm_concurrency = int(os.environ.get("LLM_CONCURRENCY", "1")) + llm_semaphore = asyncio.Semaphore(llm_concurrency) + async def _one(sub: str, when: TopWhen) -> tuple[int, int]: async with factory() as session, httpx.AsyncClient() as client: return await ingest_subreddit( @@ -106,6 +131,7 @@ async def _ingest_all( claude_bearer=claude_bearer, client=client, fx_rates=rates, + llm_semaphore=llm_semaphore, ) tasks = [_one(s, w) for s in subs for w in when_list]