Simple way to rate limit HttpClient requests
Asked Answered
B

4

19

I am using the HTTPClient in System.Net.Http to make requests against an API. The API is limited to 10 requests per second.

My code is roughly like so:

    List<Task> tasks = new List<Task>();
    items..Select(i => tasks.Add(ProcessItem(i));

    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception ex)
    {
    }

The ProcessItem method does a few things but always calls the API using the following: await SendRequestAsync(..blah). Which looks like:

private async Task<Response> SendRequestAsync(HttpRequestMessage request, CancellationToken token)
{    
    token.ThrowIfCancellationRequested();
    var response = await HttpClient
        .SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false);

    token.ThrowIfCancellationRequested();
    return await Response.BuildResponse(response);
}

Originally the code worked fine but when I started using Task.WhenAll I started getting 'Rate Limit Exceeded' messages from the API. How can I limit the rate at which requests are made?

Its worth noting that ProcessItem can make between 1-4 API calls depending on the item.

Bailar answered 18/2, 2016 at 22:39 Comment(3)
How many requests are in items at any time? Where exactly do you form taskList?Kriskrischer
There are 18000 items, lots.Bailar
Is it possible to catch the fact you have hit a limit in the response and then wait the time needed before retrying.Leveloff
C
14

The API is limited to 10 requests per second.

Then just have your code do a batch of 10 requests, ensuring they take at least one second:

Items[] items = ...;

int index = 0;
while (index < items.Length)
{
  var timer = Task.Delay(TimeSpan.FromSeconds(1.2)); // ".2" to make sure
  var tasks = items.Skip(index).Take(10).Select(i => ProcessItemsAsync(i));
  var tasksAndTimer = tasks.Concat(new[] { timer });
  await Task.WhenAll(tasksAndTimer);
  index += 10;
}

Update

My ProcessItems method makes 1-4 API calls depending on the item.

In this case, batching is not an appropriate solution. You need to limit an asynchronous method to a certain number, which implies a SemaphoreSlim. The tricky part is that you want to allow more calls over time.

I haven't tried this code, but the general idea I would go with is to have a periodic function that releases the semaphore up to 10 times. So, something like this:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10);

private async Task<Response> ThrottledSendRequestAsync(HttpRequestMessage request, CancellationToken token)
{
  await _semaphore.WaitAsync(token);
  return await SendRequestAsync(request, token);
}

private async Task PeriodicallyReleaseAsync(Task stop)
{
  while (true)
  {
    var timer = Task.Delay(TimeSpan.FromSeconds(1.2));

    if (await Task.WhenAny(timer, stop) == stop)
      return;

    // Release the semaphore at most 10 times.
    for (int i = 0; i != 10; ++i)
    {
      try
      {
        _semaphore.Release();
      }
      catch (SemaphoreFullException)
      {
        break;
      }
    }
  }
}

Usage:

// Start the periodic task, with a signal that we can use to stop it.
var stop = new TaskCompletionSource<object>();
var periodicTask = PeriodicallyReleaseAsync(stop.Task);

// Wait for all item processing.
await Task.WhenAll(taskList);

// Stop the periodic task.
stop.SetResult(null);
await periodicTask;
Chablis answered 19/2, 2016 at 2:10 Comment(10)
Similar issue to the answer above. My ProcessItems method makes 1-4 API calls depending on the item. In my mind this pushes me towards implementing the rate limit as close to the api call as possible.Bailar
Be careful throttling requests in the code. Any request that you hold on to is going to eat up an http connection and can eventually start bottle-necking your server. If you can, throttle by rejecting the request with a response status code of 429 when the request is outside the bounds of the threshold.Moniz
Would there be a way to adapt this to simply just wait 50 ms in between HttpClient.GetAsyncs? I'm also sending them with a LINQ Select like the original question.Alleenallegation
@Hershizer33: It's possible to do this with async, but it's easier to express with Rx.Chablis
I'm trying to use your code to accomplish the same thing as the original poster. For testing purposes, I'm trying to just run 1 SendRequestAsync per second, so I tried changing the initial count of SemaphoreSlim to 1 as well as change the 10 to a 1 in the for loop which releases the semaphore. However, when I test my code with 20 requests, they are still all running at the same time. I would expect them instead to run 1 every second.Naquin
@Brent: What you described should work. If you have a reproducible example, it's probably easiest to post your own question with that example code.Chablis
@StephenCleary Thanks for confirming. I figured it out! I just had to initialize thee SemaphoreSlim with two arguments, like: new SemaphoreSlim(1, 1) rather than simply new SemaphoreSlim(1). I was making the wrong assumption that initialCount would define the maxCount. My testcode was running for some time, so it had accumulated requests by the time I started consuming them.Naquin
@StephenCleary why isn't batching an appropiate solution here?Tawana
Also, Where do you call throttle in your second solution?Tawana
@johnny5: I believe the reason I said that was because if the items were batched, then the full 10 requests per second limit may not be filled up. But if all the requests were queued up and throttled, then the code can always send 10/sec until the queue is drained. The throttling is done by the call to WaitAsync.Chablis
G
3

The answer is similar to this one.

Instead of using a list of tasks and WhenAll, use Parallel.ForEach and use ParallelOptions to limit the number of concurrent tasks to 10, and make sure each one takes at least 1 second:

Parallel.ForEach(
    items,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
      ProcessItems(item);
      await Task.Delay(1000);
    }
);

Or if you want to make sure each item takes as close to 1 second as possible:

Parallel.ForEach(
    searches,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
        var watch = new Stopwatch();
        watch.Start();
        ProcessItems(item);
        watch.Stop();
        if (watch.ElapsedMilliseconds < 1000) await Task.Delay((int)(1000 - watch.ElapsedMilliseconds));
    }
);

Or:

Parallel.ForEach(
    searches,
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async item => {
        await Task.WhenAll(
                Task.Delay(1000),
                Task.Run(() => { ProcessItems(item); })
            );
    }
);
Gannet answered 18/2, 2016 at 23:0 Comment(6)
That won't help with rate limiting if the requests complete in less than one second, since the limiting is based on requests/second rather than concurrent requests.Kriskrischer
Right, I missed that. I've edited my answer to add a delay for each item.Gannet
While that will work, it's less than optimal because you will have at most 10 requests/second (you get that if ProcessItems() takes no time).Kriskrischer
It's like blackjack. He wants to get as close to 10 requests/second without going over.Kriskrischer
I've added some options.Gannet
This is a pretty creative answer. I'm surprised I don't see it mentioned in other answers on Parallel.ForEach(). The await Task.WhenAll(...) is the other half of rate limiting along with choosing a MaxDegreeOfParallelism. It seems more elegant than the SemaphoreSlim model as well.Undercast
K
2

UPDATED ANSWER

My ProcessItems method makes 1-4 API calls depending on the item. So with a batch size of 10 I still exceed the rate limit.

You need to implement a rolling window in SendRequestAsync. A queue containing timestamps of each request is a suitable data structure. You dequeue entries with a timestamp older than 10 seconds. As it so happens, there is an implementation as an answer to a similar question on SO.

ORIGINAL ANSWER

May still be useful to others

One straightforward way to handle this is to batch your requests in groups of 10, run those concurrently, and then wait until a total of 10 seconds has elapsed (if it hasn't already). This will bring you in right at the rate limit if the batch of requests can complete in 10 seconds, but is less than optimal if the batch of requests takes longer. Have a look at the .Batch() extension method in MoreLinq. Code would look approximately like

foreach (var taskList in tasks.Batch(10))
{
    Stopwatch sw = Stopwatch.StartNew(); // From System.Diagnostics
    await Task.WhenAll(taskList.ToArray());
    if (sw.Elapsed.TotalSeconds < 10.0) 
    {
        // Calculate how long you still have to wait and sleep that long
        // You might want to wait 10.5 or 11 seconds just in case the rate
        // limiting on the other side isn't perfectly implemented
    }
}
Kriskrischer answered 18/2, 2016 at 23:24 Comment(1)
I've actually tried this idea but it doesn't work. My ProcessItems method makes 1-4 API calls depending on the item. So with a batch size of 10 I still exceed the rate limit. With a smaller batch size, say 5, it takes considerably longer to process the 18000 items.Bailar
P
1

https://github.com/thomhurst/EnumerableAsyncProcessor

I've written a library to help with this sort of logic.

Usage would be:

var responses = await AsyncProcessorBuilder.WithItems(items) // Or Extension Method: items.ToAsyncProcessorBuilder()
        .SelectAsync(item => ProcessItem(item), CancellationToken.None)
        .ProcessInParallel(levelOfParallelism: 10, TimeSpan.FromSeconds(1));
Partner answered 8/1, 2022 at 12:7 Comment(1)
I assume this only works when all requests are known at the beginning? You cannot add requests mid-way right?Chem

© 2022 - 2024 — McMap. All rights reserved.