Classification is a common LLM workload: many short inputs, one compact label for each input, and a strong need for retries and cost control. This scenario shows a backend-only batch runner against token-hub’s OpenAI-compatible endpoint.
Use the current public smoke-tested moonshot-v1-8k route unless your account exposes another verified model.
Input shape
id,text
1,"Cannot reset password after changing email"
2,"Invoice total does not match purchase order"
3,"The dashboard is slow during export"
Batch runner
import asyncio
import csv
import json
import os
from pathlib import Path
import aiohttp
TOKENHUB_URL = "https://llm.sandboxclaw.com/v1/chat/completions"
API_KEY = os.environ["TOKENHUB_KEY"]
MODEL = "moonshot-v1-8k"
CONCURRENCY = 8
SYSTEM = """Classify the user message into exactly one label:
- auth
- billing
- performance
- other
Return only the label."""
async def classify(session: aiohttp.ClientSession, row: dict[str, str]) -> dict[str, str]:
payload = {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": row["text"]},
],
"max_tokens": 8,
"temperature": 0,
}
for attempt in range(3):
async with session.post(TOKENHUB_URL, json=payload) as resp:
if resp.status == 429:
await asyncio.sleep(1 + attempt)
continue
if resp.status >= 500:
await asyncio.sleep(1 + attempt * 2)
continue
data = await resp.json()
if resp.status >= 400:
raise RuntimeError(json.dumps(data))
label = data["choices"][0]["message"]["content"].strip()
return {"id": row["id"], "label": label}
raise RuntimeError(f"failed after retries: {row['id']}")
async def main() -> None:
rows = list(csv.DictReader(Path("tickets.csv").open()))
connector = aiohttp.TCPConnector(limit=CONCURRENCY)
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
sem = asyncio.Semaphore(CONCURRENCY)
async def guarded(row: dict[str, str]) -> dict[str, str]:
async with sem:
return await classify(session, row)
results = await asyncio.gather(*(guarded(row) for row in rows))
with Path("labels.csv").open("w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["id", "label"])
writer.writeheader()
writer.writerows(results)
asyncio.run(main())
Run it:
TOKENHUB_KEY=... python classify.py
Production notes
- Keep the TokenHub key in server-side environment variables.
- Start with bounded concurrency, then request higher limits when the workload is stable.
- Log request IDs and failed rows so retry jobs can resume without duplicating completed work.
- Treat 400/401/402/403 as configuration errors; retrying them without changes will not help.
- Retry 429 and 5xx with backoff.