Contents

Async + Token Bucket: How to Batch LLM API Calls Efficiently

Recently, at work, I’ve been working on setting up a LLM evaluation platform. There’s one particular scenario: we need to call an LLM API provided by another department to run model evaluations on a test dataset, but this LLM API has a rate limit of a maximum of 2 calls per second (2 RPS). Thus, my task essentially boils down to: How to maximize concurrency to speed up model evaluation while strictly adhering to the API rate limits. In this brief post, I will share my thoughts about approaching this task.

For demonstration purposes, we’ll use fastapi to create a /chat endpoint with a rate limit via slowapi, as shown below

import random
import asyncio
from fastapi import FastAPI, Response, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# Note: the route decorator must be above the limit decorator, not below it
@app.get("/chat")
@limiter.limit("2/second")
async def chat(request: Request, response: Response):
    await asyncio.sleep(random.randint(2, 5))
    return "hello world"

Once the FastAPI service is running, we can treat http://127.0.0.1:8000/chat as a rate-limited LLM endpoint.

Since it’s an I/O-bound task, my first instinct is to use a thread pool for concurrency. For sending HTTP requests, I chose the popular requests library.

Let’s import all the libraries we need.

import time
import requests
from concurrent.futures import ThreadPoolExecutor

Next, we’ll implement a function called call_api to send HTTP requests with retry logic.

def call_api(
    url: str,
    idx: int,
    max_retry: int = 5,
    retry_sleep: int = 3,
    retry_multiplier: int = 2,
) -> dict:
    for i in range(max_retry):
        print(f"[Request {idx}] Sending {time.time() - START:.2f}")
        try:
            response = requests.get(url)
            response.raise_for_status()
            if response:
                print(
                    f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {response.json()}"
                )
                return response.json()
            retry_sleep = 3
        except Exception:
            print(
                f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
            )
            time.sleep(retry_sleep)
            retry_sleep = retry_sleep**retry_multiplier
    raise Exception("Hit max retry limit")

Next, let’s create a thread pool of size 2 (to respect the rate limit) and use it to send our 10 HTTP requests.

print(f"=== {time.time() - START:.2f} Start All Tasks ===")

with ThreadPoolExecutor(2) as executor:
    urls = ["http://127.0.0.1:8000/chat"] * 10
    indices = list(range(len(urls)))
    results = executor.map(call_api, urls, indices)

print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

After launching the task, the log shows:

=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 1] SUCCESS 3.01: receive hello world
[Request 2] Sending 3.01
[Request 0] SUCCESS 5.01: receive hello world
[Request 3] Sending 5.01
[Request 3] SUCCESS 7.01: receive hello world
[Request 4] Sending 7.01
[Request 2] SUCCESS 7.01: receive hello world
[Request 5] Sending 7.02
[Request 5] SUCCESS 10.01: receive hello world
[Request 6] Sending 10.01
[Request 4] SUCCESS 12.01: receive hello world
[Request 7] Sending 12.01
[Request 6] SUCCESS 13.01: receive hello world
[Request 8] Sending 13.01
[Request 7] SUCCESS 15.02: receive hello world
[Request 9] Sending 15.02
[Request 8] SUCCESS 17.02: receive hello world
[Request 9] SUCCESS 20.02: receive hello world
=== 20.02 Finish All Tasks ===

From the logs, we can see that it takes approximately 20 seconds to complete 10 HTTP requests - significantly longer than my expected 10 seconds. Given that the endpoint can handle 2 HTTP requests concurrently with a maximum response time of 5 seconds each, this performance is unexpected.

To obtain more reliable performance metrics, I used hyperfine and the result is:

Benchmark 1: uv run multi_thread.py
  Time (mean ± σ):     18.688 s ±  2.007 s    [User: 0.118 s, System: 0.028 s]
  Range (min  max):   15.198 s  22.165 s    10 runs

Indeed, the solution above never completes 10 HTTP requests within 10 seconds - the average time is approximately 18 seconds. How could this happen? By analyzing the logs, I discovered some unexpected patterns.

[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 3.01
[Request 3] Sending 5.01

The first 2 HTTP requests were sent immediately. However, the 3rd and 4th HTTP requests were delayed by 3 and 5 seconds respectively. This contradicts the expected behavior of sending 2 requests per second.

🤔 The solution becomes obvious when you understand async programming. The requests library is synchronous - each requests.get call in the call_api method blocks the thread until receiving a response.

Note

Conclusion: Avoid combining multithreading with the synchronous requests library for concurrent HTTP requests.

After identifying the cause for the performance bottleneck, I discovered aiohttp, a popular async HTTP Client library. Its official documentation is well-written for developers with basic async knowledge.

Let’s make the call_api an asynchronous function, as shown below.

async def call_api(
    session: aiohttp.ClientSession,
    url: str,
    idx: int,
    max_retry: int = 5,
    retry_sleep: int = 3,
    retry_multiplier: int = 2,
) -> dict:
    for i in range(max_retry):
        print(f"[Request {idx}] Sending {time.time() - START:.2f}")
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                result = await response.json()
                print(
                    f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {result}"
                )
                return result
            retry_sleep = 3
        except Exception:
            print(
                f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
            )
            await asyncio.sleep(retry_sleep)
            retry_sleep = retry_sleep**retry_multiplier
    raise Exception("Hit max retry limit")

Next, let’s write a main function to launch all tasks using asyncio.gather.

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ["http://127.0.0.1:8000/chat"] * 10
        results = asyncio.gather(
            *[call_api(session, url, i) for i, url in enumerate(urls)]
        )
        await results


if __name__ == "__main__":
    print(f"=== {time.time() - START:.2f} Start All Tasks ===")
    asyncio.run(main())
    print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

The logs are shown below.

=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 0.00
[Request 3] Sending 0.00
[Request 4] Sending 0.00
[Request 5] Sending 0.00
[Request 6] Sending 0.00
[Request 7] Sending 0.00
[Request 8] Sending 0.00
[Request 9] Sending 0.00
[Request 4] FAIL 0.00676417350769043: retry in 3 seconds
[Request 9] FAIL 0.0068662166595458984: retry in 3 seconds
[Request 2] FAIL 0.006894111633300781: retry in 3 seconds
[Request 7] FAIL 0.006909370422363281: retry in 3 seconds
[Request 5] FAIL 0.006922006607055664: retry in 3 seconds
[Request 3] FAIL 0.0069332122802734375: retry in 3 seconds
[Request 8] FAIL 0.006942033767700195: retry in 3 seconds
[Request 6] FAIL 0.006951332092285156: retry in 3 seconds
...

🤔 Eight out of ten HTTP requests failed due to rate limits. However, we did see some improvements - all HTTP requests have been sent concurrently. Now we just need to enforce rate limit compliance.

Note

Conclusion: Use an asynchronous library such as aiohttp for concurrent HTTP requests.

Tip

Because the rate limit is 2 RPS, the capacity of the token bucket should be 2 and the rate should be 2 tokens/second.

The token bucket works like an actual bucket containing tokens. Each operation consumes tokens from the bucket before proceeding. Typically, tokens are replenished at a fixed rate. There are some key factors:

  • rate: replenished rate in tokens/second.
  • tokens: currently available tokens.
  • capacity: token bucket capacity.
  • last_update: timestamp of last bucket replenishment.
  • need: tokens required per operation.

The detailed algorithm pseudo-code is shown below.

  • Initialization
    • rate = ..., initialize the rate with the specified rate - 2 in our example.
    • capacity = ..., initialize the token bucket capacity - 2 in our example.
    • tokens = 0
    • last_update = time.monotonic()
  • Do
    1. Replenish tokens based on the time elapsed since last_update. Note that the tokens have an upper bound (capacity).
    2. Check the current tokens
      • If tokens >= need, deduct need from tokens.
      • Otherwise, just waiting (need - tokens) / rate seconds and return to step 1.
Tip
  1. Since the tokens and last_update are shared variables across coroutines, we use asyncio.Lock() to ensure atomatic updates.
  2. We use time.monotonic instead of time.time because it guarantees strictly increasing timestamps, which is critical for correct elapsed time calculation.

The code is shown below.

class RateLimiter:
    def __init__(self, rate: int, capacity: int):
        self.rate = rate
        self.tokens = 0
        self.capacity = capacity
        self.lock = asyncio.Lock()
        self.last_update = time.monotonic()

    async def acquire(self, need: int = 1):
        while True:
            async with self.lock:
                now = time.monotonic()
                elapsed = now - self.last_update

                # Add new tokens
                new_tokens = int(elapsed * self.rate)
                if new_tokens > 0:
                    self.tokens = min(self.capacity, self.tokens + new_tokens)
                    self.last_update = now

                if self.tokens >= need:
                    self.tokens -= need
                    return

                deficit = need - self.tokens
                wait_time = deficit / self.rate

            await asyncio.sleep(wait_time)

Next, we need to modify the previous written code:

Info

For simplicity, I’ll omit the duplicate parts (marked with ...)

async def call_api(
    ...
    rate_limiter: RateLimiter,
    ...
) -> dict:
    ...
        try:
            await rate_limiter.acquire()
            ...
        except Exception:
            ...
    ...

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ["http://127.0.0.1:8000/chat"] * 10
        rate_limiter = RateLimiter(rate=2, capacity=2)
        results = asyncio.gather(
            *[call_api(session, rate_limiter, url, i) for i, url in enumerate(urls)]
        )
        await results

        
if __name__ == "__main__":
    print(f"=== {time.time() - START:.2f} Start All Tasks ===")
    asyncio.run(main())
    print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

Finally, let’s use hyperfine to test the performance.

Benchmark 1: uv run async_concurrency.py
  Time (mean ± σ):      9.766 s ±  0.552 s    [User: 0.173 s, System: 0.032 s]
  Range (min  max):    8.715 s  10.240 s    10 runs

The improvements are significant and it reached my expected speed.

The above pretty much captures my thought process while tackling this issue. As an AI engineer, I mostly work with Python multiprocessing for data processing and data analysis. This time, dealing with batching LLM API calls introduced me to new concepts like async programming and the token bucket algorithm — both of which were fresh territory for me. Overall, it was a fun challenge, and I learned a lot from it. :D