Contents

Async + Leaky Bucket: How to Batch LLM API Calls Efficiently

Recently, at work, I’ve been working on setting up a LLM evaluation platform. There’s one particular scenario: we need to call an LLM API provided by another department to run model evaluations on a test dataset, but this LLM API has a rate limit of a maximum of 2 calls per second (2 RPS). Thus, my task essentially boils down to: How to maximize concurrency to speed up model evaluation while strictly adhering to the API rate limits. In this brief post, I will share my thoughts about approaching this task.

For demonstration purposes, we’ll use fastapi to create a /chat endpoint with a rate limit via slowapi, as shown below

import random
import asyncio
from fastapi import FastAPI, Response, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# Note: the route decorator must be above the limit decorator, not below it
@app.get("/chat")
@limiter.limit("2/second")
async def chat(request: Request, response: Response):
    await asyncio.sleep(random.randint(2, 5))
    return "hello world"

Once the FastAPI service is running, we can treat http://127.0.0.1:8000/chat as a rate-limited LLM endpoint.

Since it’s an I/O-bound task, my first instinct is to use a thread pool for concurrency. For sending HTTP requests, I chose the popular requests library.

Let’s import all the libraries we need.

import time
import requests
from concurrent.futures import ThreadPoolExecutor

Next, we’ll implement a function called call_api to send HTTP requests with retry logic.

def call_api(
    url: str,
    idx: int,
    max_retry: int = 5,
    retry_sleep: int = 3,
    retry_multiplier: int = 2,
) -> dict:
    for i in range(max_retry):
        print(f"[Request {idx}] Sending {time.time() - START:.2f}")
        try:
            response = requests.get(url)
            response.raise_for_status()
            if response:
                print(
                    f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {response.json()}"
                )
                return response.json()
            retry_sleep = 3
        except Exception:
            print(
                f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
            )
            time.sleep(retry_sleep)
            retry_sleep = retry_sleep**retry_multiplier
    raise Exception("Hit max retry limit")

Next, let’s create a thread pool of size 2 (to respect the rate limit) and use it to send our 10 HTTP requests.

print(f"=== {time.time() - START:.2f} Start All Tasks ===")

with ThreadPoolExecutor(2) as executor:
    urls = ["http://127.0.0.1:8000/chat"] * 10
    indices = list(range(len(urls)))
    results = executor.map(call_api, urls, indices)

print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

After launching the task, the log shows:

=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 1] SUCCESS 3.01: receive hello world
[Request 2] Sending 3.01
[Request 0] SUCCESS 5.01: receive hello world
[Request 3] Sending 5.01
[Request 3] SUCCESS 7.01: receive hello world
[Request 4] Sending 7.01
[Request 2] SUCCESS 7.01: receive hello world
[Request 5] Sending 7.02
[Request 5] SUCCESS 10.01: receive hello world
[Request 6] Sending 10.01
[Request 4] SUCCESS 12.01: receive hello world
[Request 7] Sending 12.01
[Request 6] SUCCESS 13.01: receive hello world
[Request 8] Sending 13.01
[Request 7] SUCCESS 15.02: receive hello world
[Request 9] Sending 15.02
[Request 8] SUCCESS 17.02: receive hello world
[Request 9] SUCCESS 20.02: receive hello world
=== 20.02 Finish All Tasks ===

From the logs, we can see that it takes approximately 20 seconds to complete 10 HTTP requests - significantly longer than my expected 10 seconds. Given that the endpoint can handle 2 HTTP requests concurrently with a maximum response time of 5 seconds each, this performance is unexpected.

To obtain more reliable performance metrics, I used hyperfine and the result is:

Benchmark 1: uv run multi_thread.py
  Time (mean ± σ):     18.688 s ±  2.007 s    [User: 0.118 s, System: 0.028 s]
  Range (min  max):   15.198 s  22.165 s    10 runs

Indeed, the solution above never completes 10 HTTP requests within 10 seconds - the average time is approximately 18 seconds. How could this happen? By analyzing the logs, I discovered some unexpected patterns.

[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 3.01
[Request 3] Sending 5.01

The first 2 HTTP requests were sent immediately. However, the 3rd and 4th HTTP requests were delayed by 3 and 5 seconds respectively. This contradicts the expected behavior of sending 2 requests per second.

🤔 The solution becomes obvious when you understand async programming. The requests library is synchronous - each requests.get call in the call_api method blocks the thread until receiving a response.

Note

Conclusion: Avoid combining multithreading with the synchronous requests library for concurrent HTTP requests.

After identifying the cause for the performance bottleneck, I discovered aiohttp, a popular async HTTP Client library. Its official documentation is well-written for developers with basic async knowledge.

Let’s make the call_api an asynchronous function, as shown below.

async def call_api(
    session: aiohttp.ClientSession,
    url: str,
    idx: int,
    max_retry: int = 5,
    retry_sleep: int = 3,
    retry_multiplier: int = 2,
) -> dict:
    for i in range(max_retry):
        print(f"[Request {idx}] Sending {time.time() - START:.2f}")
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                result = await response.json()
                print(
                    f"[Request {idx}] SUCCESS {time.time() - START:.2f}: receive {result}"
                )
                return result
            retry_sleep = 3
        except Exception:
            print(
                f"[Request {idx}] FAIL {time.time() - START}: retry in {retry_sleep} seconds"
            )
            await asyncio.sleep(retry_sleep)
            retry_sleep = retry_sleep**retry_multiplier
    raise Exception("Hit max retry limit")

Next, let’s write a main function to launch all tasks using asyncio.gather.

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ["http://127.0.0.1:8000/chat"] * 10
        results = asyncio.gather(
            *[call_api(session, url, i) for i, url in enumerate(urls)]
        )
        await results


if __name__ == "__main__":
    print(f"=== {time.time() - START:.2f} Start All Tasks ===")
    asyncio.run(main())
    print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

The logs are shown below.

=== 0.00 Start All Tasks ===
[Request 0] Sending 0.00
[Request 1] Sending 0.00
[Request 2] Sending 0.00
[Request 3] Sending 0.00
[Request 4] Sending 0.00
[Request 5] Sending 0.00
[Request 6] Sending 0.00
[Request 7] Sending 0.00
[Request 8] Sending 0.00
[Request 9] Sending 0.00
[Request 4] FAIL 0.00676417350769043: retry in 3 seconds
[Request 9] FAIL 0.0068662166595458984: retry in 3 seconds
[Request 2] FAIL 0.006894111633300781: retry in 3 seconds
[Request 7] FAIL 0.006909370422363281: retry in 3 seconds
[Request 5] FAIL 0.006922006607055664: retry in 3 seconds
[Request 3] FAIL 0.0069332122802734375: retry in 3 seconds
[Request 8] FAIL 0.006942033767700195: retry in 3 seconds
[Request 6] FAIL 0.006951332092285156: retry in 3 seconds
...

🤔 Eight out of ten HTTP requests failed due to rate limits. However, we did see some improvements - all HTTP requests are now sent concurrently. Now we just need to enforce rate limit compliance.

Note

Conclusion: Use an asynchronous library such as aiohttp for concurrent HTTP requests.

Warning

The leaky bucket algorithm differs from the token bucket algorithm in providing strict rate limiting, while the token bucket for allows short-term burst traffic.

The leaky bucket algorithm, as the name suggests, is like a bucket that leaks water at a constant rate. In the context of handling HTTP requests here, the bucket contains pending HTTP requests to be sent, and the “leak rate” refers to the rate at which HTTP requests are sent (in this scenario, it’s 2 RPS).

The 2 RPS rate limitation can be translated to sending 1 HTTP request every 0.5 seconds. Therefore, after each HTTP request is sent, we need to wait for this interval (denoted as interval). For easier code implementation, we can maintain a variable next_available to indicate when the next HTTP request can be sent. However, note that in scenarios with concurrent HTTP requests, next_available is a shared variable& that may be modified by multiple coroutines. Thus, a lock is required to ensure atomic operations during each modification.

The code is shown below.

class LeakyBucketLimiter:
    def __init__(self, rate: float, epsilon: float = 0.01):
        self.rate = rate
        self.interval = 1.0 / rate
        self.lock = asyncio.Lock()
        self.epsilon = epsilon
        self.next_available = time.monotonic()

    async def acquire(self):
        async with self.lock:
            now = time.monotonic()
            wait_time = self.next_available - now
            if wait_time > 0:
                await asyncio.sleep(wait_time)

            self.next_available = time.monotonic() + self.interval + self.epsilon

![warning] Some caveats:

  1. We use time.monotonic instead of time.time because it guarantees strictly increasing timestamps, which is critical for correct elapsed time calculation.
  2. Without the epsilon term, the actual HTTP sending rate might exceed the limit. epsilon makes the HTTP sending rate more conservative.

Next, we need to modify the previous written code:

Info

For simplicity, I’ll omit the duplicate parts (marked with ...)

async def call_api(
    ...
    rate_limiter: LeakyBucketLimiter,
    ...
) -> dict:
    ...
        try:
            await rate_limiter.acquire()
            ...
        except Exception:
            ...
    ...

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ["http://127.0.0.1:8000/chat"] * 10
        rate_limiter = LeakyBucketLimiter(rate=2)
        results = asyncio.gather(
            *[call_api(session, rate_limiter, url, i) for i, url in enumerate(urls)]
        )
        await results

        
if __name__ == "__main__":
    print(f"=== {time.time() - START:.2f} Start All Tasks ===")
    asyncio.run(main())
    print(f"=== {time.time() - START:.2f} Finish All Tasks ===")

Finally, let’s use hyperfine to test the performance.

Benchmark 1: uv run async_concurrency.py
  Time (mean ± σ):      8.708 s ±  0.655 s    [User: 0.181 s, System: 0.035 s]
  Range (min  max):    7.332 s   9.328 s    10 runs

The improvements are significant and it reached my expected speed.

Note

Conclusion: Use leaky bucket for strict rate limiting.

The above pretty much captures my thought process while tackling this issue. As an AI engineer, I mostly work with Python multiprocessing for data processing and data analysis. This time, dealing with batching LLM API calls introduced me to new concepts like async programming and the leaky bucket algorithm — both of which were fresh territory for me. Overall, it was a fun challenge, and I learned a lot from it. :D