Skip to main content
token-hub

Batch classification with token-hub

Run large-volume classification jobs through token-hub with aiohttp, bounded concurrency, retries, and the verified moonshot-v1-8k route.

Classification is a common LLM workload: many short inputs, one compact label for each input, and a strong need for retries and cost control. This scenario shows a backend-only batch runner against token-hub’s OpenAI-compatible endpoint.

Use the current public smoke-tested moonshot-v1-8k route unless your account exposes another verified model.

Input shape

id,text
1,"Cannot reset password after changing email"
2,"Invoice total does not match purchase order"
3,"The dashboard is slow during export"

Batch runner

import asyncio
import csv
import json
import os
from pathlib import Path

import aiohttp

TOKENHUB_URL = "https://llm.sandboxclaw.com/v1/chat/completions"
API_KEY = os.environ["TOKENHUB_KEY"]
MODEL = "moonshot-v1-8k"
CONCURRENCY = 8

SYSTEM = """Classify the user message into exactly one label:
- auth
- billing
- performance
- other

Return only the label."""


async def classify(session: aiohttp.ClientSession, row: dict[str, str]) -> dict[str, str]:
    payload = {
        "model": MODEL,
        "messages": [
            {"role": "system", "content": SYSTEM},
            {"role": "user", "content": row["text"]},
        ],
        "max_tokens": 8,
        "temperature": 0,
    }

    for attempt in range(3):
        async with session.post(TOKENHUB_URL, json=payload) as resp:
          if resp.status == 429:
              await asyncio.sleep(1 + attempt)
              continue
          if resp.status >= 500:
              await asyncio.sleep(1 + attempt * 2)
              continue
          data = await resp.json()
          if resp.status >= 400:
              raise RuntimeError(json.dumps(data))
          label = data["choices"][0]["message"]["content"].strip()
          return {"id": row["id"], "label": label}

    raise RuntimeError(f"failed after retries: {row['id']}")


async def main() -> None:
    rows = list(csv.DictReader(Path("tickets.csv").open()))
    connector = aiohttp.TCPConnector(limit=CONCURRENCY)
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
    }

    async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
        sem = asyncio.Semaphore(CONCURRENCY)

        async def guarded(row: dict[str, str]) -> dict[str, str]:
            async with sem:
                return await classify(session, row)

        results = await asyncio.gather(*(guarded(row) for row in rows))

    with Path("labels.csv").open("w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["id", "label"])
        writer.writeheader()
        writer.writerows(results)


asyncio.run(main())

Run it:

TOKENHUB_KEY=... python classify.py

Production notes