examples: serialize LLM calls via Semaphore (default 1) to dodge CAS busy-lock
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
This commit is contained in:
parent
c1c1e2202b
commit
9d308c36dc
1 changed files with 33 additions and 7 deletions
|
|
@ -52,20 +52,38 @@ async def ingest_subreddit(
|
||||||
claude_bearer: str,
|
claude_bearer: str,
|
||||||
client: httpx.AsyncClient,
|
client: httpx.AsyncClient,
|
||||||
fx_rates: dict[str, Decimal],
|
fx_rates: dict[str, Decimal],
|
||||||
|
llm_semaphore: asyncio.Semaphore | None = None,
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
|
"""Yield (inserted, skipped) counts for one (sub, when) bucket.
|
||||||
|
|
||||||
|
`llm_semaphore` serializes the LLM call across parallel sub-runs;
|
||||||
|
claude-agent-service's `/v1/chat/completions` has a single-flight
|
||||||
|
busy-lock, so 12 concurrent fan-outs trample each other. Default
|
||||||
|
None = no serialization (test path).
|
||||||
|
"""
|
||||||
inserted = 0
|
inserted = 0
|
||||||
skipped = 0
|
skipped = 0
|
||||||
async for post in fetch_top(reddit, sub, when, limit=limit):
|
async for post in fetch_top(reddit, sub, when, limit=limit):
|
||||||
if not is_candidate(post):
|
if not is_candidate(post):
|
||||||
skipped += 1
|
skipped += 1
|
||||||
continue
|
continue
|
||||||
extracted = await extract_with_fallback(
|
if llm_semaphore is not None:
|
||||||
post,
|
async with llm_semaphore:
|
||||||
llama_url=llama_url,
|
extracted = await extract_with_fallback(
|
||||||
claude_url=claude_url,
|
post,
|
||||||
claude_bearer=claude_bearer,
|
llama_url=llama_url,
|
||||||
client=client,
|
claude_url=claude_url,
|
||||||
)
|
claude_bearer=claude_bearer,
|
||||||
|
client=client,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
extracted = await extract_with_fallback(
|
||||||
|
post,
|
||||||
|
llama_url=llama_url,
|
||||||
|
claude_url=claude_url,
|
||||||
|
claude_bearer=claude_bearer,
|
||||||
|
client=client,
|
||||||
|
)
|
||||||
if extracted is None:
|
if extracted is None:
|
||||||
log.info("dropping %s — both LLM tiers failed", post.reddit_id)
|
log.info("dropping %s — both LLM tiers failed", post.reddit_id)
|
||||||
skipped += 1
|
skipped += 1
|
||||||
|
|
@ -96,6 +114,13 @@ async def _ingest_all(
|
||||||
claude_url = os.environ["CLAUDE_AGENT_SERVICE_URL"]
|
claude_url = os.environ["CLAUDE_AGENT_SERVICE_URL"]
|
||||||
claude_bearer = os.environ["CLAUDE_AGENT_BEARER"]
|
claude_bearer = os.environ["CLAUDE_AGENT_BEARER"]
|
||||||
|
|
||||||
|
# Cap concurrent LLM calls across all sub-runs. claude-agent-service's
|
||||||
|
# /v1/chat/completions busy-locks (single-flight), so 12-sub fan-out
|
||||||
|
# otherwise loses 11 calls to 503. Default 1 = strict serial. Bump
|
||||||
|
# to 2-3 once the busy-lock is dropped for the chat endpoint.
|
||||||
|
llm_concurrency = int(os.environ.get("LLM_CONCURRENCY", "1"))
|
||||||
|
llm_semaphore = asyncio.Semaphore(llm_concurrency)
|
||||||
|
|
||||||
async def _one(sub: str, when: TopWhen) -> tuple[int, int]:
|
async def _one(sub: str, when: TopWhen) -> tuple[int, int]:
|
||||||
async with factory() as session, httpx.AsyncClient() as client:
|
async with factory() as session, httpx.AsyncClient() as client:
|
||||||
return await ingest_subreddit(
|
return await ingest_subreddit(
|
||||||
|
|
@ -106,6 +131,7 @@ async def _ingest_all(
|
||||||
claude_bearer=claude_bearer,
|
claude_bearer=claude_bearer,
|
||||||
client=client,
|
client=client,
|
||||||
fx_rates=rates,
|
fx_rates=rates,
|
||||||
|
llm_semaphore=llm_semaphore,
|
||||||
)
|
)
|
||||||
|
|
||||||
tasks = [_one(s, w) for s in subs for w in when_list]
|
tasks = [_one(s, w) for s in subs for w in when_list]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue