Async + Token Bucket: How to Batch LLM API Calls Efficiently
Background
Recently, at work, I’ve been working on setting up a LLM evaluation platform. There’s one particular scenario: we need to call an LLM API provided by another department to run model evaluations on a test dataset, but this LLM API has a rate limit of a maximum of 2 calls per second (2 RPS). Thus, my task essentially boils down to: How to maximize concurrency to speed up model evaluation while strictly adhering to the API rate limits. In this brief post, I will share my thoughts about approaching this task.
Environment Setup
For demonstration purposes, we’ll use fastapi
to create a /chat
endpoint with a rate limit via slowapi
, as shown below
import random
import asyncio
from fastapi import FastAPI, Response, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Note: the route decorator must be above the limit decorator, not below it
@app.get("/chat")
@limiter.limit("2/second")
async def chat(request: Request, response: Response):
await asyncio.sleep(random.randint(2, 5))
return "hello world"
Once the FastAPI service is running, we can treat http://127.0.0.1:8000/chat
as a rate-limited LLM endpoint.
First try: Multithreading + Requests
Since it’s an I/O-bound task, my first instinct is to use a thread pool for concurrency. For sending HTTP requests, I chose the popular requests
library.
Let’s import all the libraries we need.
import time
import requests
from concurrent.futures import ThreadPoolExecutor
Next, we’ll implement a function called call_api
to send HTTP requests with retry logic.
def call_api(
url: str,
idx: int,
max_retry: int = 5,
retry_sleep: int = 3,
retry_multiplier: int = 2,
) -> dict:
for i in range(max_retry):
print(f"[Request {idx}] Sending {time.time() - START:.2f}")
try:
response = requests.get(url)
response.raise_for_status()
if response:
print(
f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {response.json()}"
)
return response.json()
retry_sleep = 3
except Exception:
print(
f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
)
time.sleep(retry_sleep)
retry_sleep = retry_sleep**retry_multiplier
raise Exception("Hit max retry limit")
Next, let’s create a thread pool of size 2 (to respect the rate limit) and use it to send our 10 HTTP requests.
print(f"=== {time.time() - START:.2f} Start All Tasks ===")
with ThreadPoolExecutor(2) as executor:
urls = ["http://127.0.0.1:8000/chat"] * 10
indices = list(range(len(urls)))
results = executor.map(call_api, urls, indices)
print(f"=== {time.time() - START:.2f} Finish All Tasks ===")
After launching the task, the log shows:
=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 1] SUCCESS 3.01: receive hello world
[Request 2] Sending 3.01
[Request 0] SUCCESS 5.01: receive hello world
[Request 3] Sending 5.01
[Request 3] SUCCESS 7.01: receive hello world
[Request 4] Sending 7.01
[Request 2] SUCCESS 7.01: receive hello world
[Request 5] Sending 7.02
[Request 5] SUCCESS 10.01: receive hello world
[Request 6] Sending 10.01
[Request 4] SUCCESS 12.01: receive hello world
[Request 7] Sending 12.01
[Request 6] SUCCESS 13.01: receive hello world
[Request 8] Sending 13.01
[Request 7] SUCCESS 15.02: receive hello world
[Request 9] Sending 15.02
[Request 8] SUCCESS 17.02: receive hello world
[Request 9] SUCCESS 20.02: receive hello world
=== 20.02 Finish All Tasks ===
From the logs, we can see that it takes approximately 20 seconds to complete 10 HTTP requests - significantly longer than my expected 10 seconds. Given that the endpoint can handle 2 HTTP requests concurrently with a maximum response time of 5 seconds each, this performance is unexpected.
To obtain more reliable performance metrics, I used hyperfine and the result is:
Benchmark 1: uv run multi_thread.py
Time (mean ± σ): 18.688 s ± 2.007 s [User: 0.118 s, System: 0.028 s]
Range (min … max): 15.198 s … 22.165 s 10 runs
Indeed, the solution above never completes 10 HTTP requests within 10 seconds - the average time is approximately 18 seconds. How could this happen? By analyzing the logs, I discovered some unexpected patterns.
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 3.01
[Request 3] Sending 5.01
The first 2 HTTP requests were sent immediately. However, the 3rd and 4th HTTP requests were delayed by 3 and 5 seconds respectively. This contradicts the expected behavior of sending 2 requests per second.
🤔 The solution becomes obvious when you understand async programming. The requests
library is synchronous - each requests.get
call in the call_api
method blocks the thread until receiving a response.
Conclusion: Avoid combining multithreading with the synchronous requests
library for concurrent HTTP requests.
Second try: async programming with aiohttp
After identifying the cause for the performance bottleneck, I discovered aiohttp
, a popular async HTTP Client library. Its official documentation is well-written for developers with basic async knowledge.
Let’s make the call_api
an asynchronous function, as shown below.
async def call_api(
session: aiohttp.ClientSession,
url: str,
idx: int,
max_retry: int = 5,
retry_sleep: int = 3,
retry_multiplier: int = 2,
) -> dict:
for i in range(max_retry):
print(f"[Request {idx}] Sending {time.time() - START:.2f}")
try:
async with session.get(url) as response:
response.raise_for_status()
result = await response.json()
print(
f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {result}"
)
return result
retry_sleep = 3
except Exception:
print(
f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
)
await asyncio.sleep(retry_sleep)
retry_sleep = retry_sleep**retry_multiplier
raise Exception("Hit max retry limit")
Next, let’s write a main
function to launch all tasks using asyncio.gather
.
async def main():
async with aiohttp.ClientSession() as session:
urls = ["http://127.0.0.1:8000/chat"] * 10
results = asyncio.gather(
*[call_api(session, url, i) for i, url in enumerate(urls)]
)
await results
if __name__ == "__main__":
print(f"=== {time.time() - START:.2f} Start All Tasks ===")
asyncio.run(main())
print(f"=== {time.time() - START:.2f} Finish All Tasks ===")
The logs are shown below.
=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 0.00
[Request 3] Sending 0.00
[Request 4] Sending 0.00
[Request 5] Sending 0.00
[Request 6] Sending 0.00
[Request 7] Sending 0.00
[Request 8] Sending 0.00
[Request 9] Sending 0.00
[Request 4] FAIL 0.00676417350769043: retry in 3 seconds
[Request 9] FAIL 0.0068662166595458984: retry in 3 seconds
[Request 2] FAIL 0.006894111633300781: retry in 3 seconds
[Request 7] FAIL 0.006909370422363281: retry in 3 seconds
[Request 5] FAIL 0.006922006607055664: retry in 3 seconds
[Request 3] FAIL 0.0069332122802734375: retry in 3 seconds
[Request 8] FAIL 0.006942033767700195: retry in 3 seconds
[Request 6] FAIL 0.006951332092285156: retry in 3 seconds
...
🤔 Eight out of ten HTTP requests failed due to rate limits. However, we did see some improvements - all HTTP requests have been sent concurrently. Now we just need to enforce rate limit compliance.
Conclusion: Use an asynchronous library such as aiohttp
for concurrent HTTP requests.
Final answer: aiohttp + token bucket algorithm
Because the rate limit is 2 RPS, the capacity
of the token bucket should be 2 and the rate
should be 2 tokens/second.
The token bucket works like an actual bucket containing tokens. Each operation consumes tokens from the bucket before proceeding. Typically, tokens are replenished at a fixed rate. There are some key factors:
rate
: replenished rate in tokens/second.tokens
: currently available tokens.capacity
: token bucket capacity.last_update
: timestamp of last bucket replenishment.need
: tokens required per operation.
The detailed algorithm pseudo-code is shown below.
- Initialization
rate = ...
, initialize therate
with the specified rate - 2 in our example.capacity = ...
, initialize the token bucket capacity - 2 in our example.tokens = 0
last_update = time.monotonic()
- Do
- Replenish tokens based on the time elapsed since
last_update
. Note that the tokens have an upper bound (capacity
). - Check the current
tokens
- If
tokens >= need
, deductneed
fromtokens
. - Otherwise, just waiting
(need - tokens) / rate
seconds and return to step 1.
- If
- Replenish tokens based on the time elapsed since
- Since the
tokens
andlast_update
are shared variables across coroutines, we useasyncio.Lock()
to ensure atomatic updates. - We use
time.monotonic
instead oftime.time
because it guarantees strictly increasing timestamps, which is critical for correct elapsed time calculation.
The code is shown below.
class RateLimiter:
def __init__(self, rate: int, capacity: int):
self.rate = rate
self.tokens = 0
self.capacity = capacity
self.lock = asyncio.Lock()
self.last_update = time.monotonic()
async def acquire(self, need: int = 1):
while True:
async with self.lock:
now = time.monotonic()
elapsed = now - self.last_update
# Add new tokens
new_tokens = int(elapsed * self.rate)
if new_tokens > 0:
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
if self.tokens >= need:
self.tokens -= need
return
deficit = need - self.tokens
wait_time = deficit / self.rate
await asyncio.sleep(wait_time)
Next, we need to modify the previous written code:
For simplicity, I’ll omit the duplicate parts (marked with ...
)
async def call_api(
...
rate_limiter: RateLimiter,
...
) -> dict:
...
try:
await rate_limiter.acquire()
...
except Exception:
...
...
async def main():
async with aiohttp.ClientSession() as session:
urls = ["http://127.0.0.1:8000/chat"] * 10
rate_limiter = RateLimiter(rate=2, capacity=2)
results = asyncio.gather(
*[call_api(session, rate_limiter, url, i) for i, url in enumerate(urls)]
)
await results
if __name__ == "__main__":
print(f"=== {time.time() - START:.2f} Start All Tasks ===")
asyncio.run(main())
print(f"=== {time.time() - START:.2f} Finish All Tasks ===")
Finally, let’s use hyperfine
to test the performance.
Benchmark 1: uv run async_concurrency.py
Time (mean ± σ): 9.766 s ± 0.552 s [User: 0.173 s, System: 0.032 s]
Range (min … max): 8.715 s … 10.240 s 10 runs
The improvements are significant and it reached my expected speed.
Wrap-up
The above pretty much captures my thought process while tackling this issue. As an AI engineer, I mostly work with Python multiprocessing for data processing and data analysis. This time, dealing with batching LLM API calls introduced me to new concepts like async programming and the token bucket algorithm — both of which were fresh territory for me. Overall, it was a fun challenge, and I learned a lot from it. :D