Add delay to parallel API call
Asked Answered
T

4

4

I'm using Polly to make parallel API calls. The server however can't process more than 25 calls per second and so I'm wondering if there is a way to add a 1s delay after each batch of 25 calls?

var policy = Policy
    .Handle<HttpRequestException>()
    .RetryAsync(3);

foreach (var mediaItem in uploadedMedia)
{
    var mediaRequest = new HttpRequestMessage { *** }
    async Task<string> func()
    {
        var response = await client.SendAsync(mediaRequest);
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);

I added a count as per the suggestion below but doesn't seem to work

foreach (var mediaItem in uploadedMedia.Items)
{
    var mediaRequest = new HttpRequestMessage
    {
        RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mediaItem.filename.S}"),
        Method = HttpMethod.Get,
        Headers = {
            { "id-token", id_Token },
            { "access-token", access_Token }
        }
    };

    async Task<string> func()
    {
        if (count == 24)
        {
            Thread.Sleep(1000);
            count = 0;
        }
        var response = await client.SendAsync(mediaRequest);
        count++;
        return await response.Content.ReadAsStringAsync();
    }
    tasks.Add(policy.ExecuteAsync(() => func()));
}

await Task.WhenAll(tasks);

foreach (var t in tasks)
{
    var postResponse = await t;
    urls.Add(postResponse);
}
Takahashi answered 25/10, 2020 at 1:16 Comment(2)
add a counter of some sort to your loop. At the end of the loop increase the counter and if counter % 25 == 0, wait – Asteria
See my edit, did I add this in the correct place? – Takahashi
N
0

There are many ways to do this, however it's fairly easy to write a simple thread safe reusable async rate limiter.

The advantages with the async approach, it doesn't block thread pool threads, it's fairly efficient, and would work well in existing async workflows and pipelines like TPL Dataflow, and Reactive Extensions.

Example

// 3 calls every 3 seconds as an example
var rateLimiter = new RateLimiter(3, TimeSpan.FromSeconds(3));

// create some work
var task1 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
var task2 = Task.Run(async () =>
   {
      for (var i = 0; i < 5; i++)
      {
         await rateLimiter.WaitAsync();
         Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
      }
   }

);
await Task.WhenAll(task1, task2);

Output

4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:15
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
4 : 10/25/2020 05:16:24

Full Demo Here

Usage

private RateLimiter _rateLimiter = new RateLimiter(25 , TimeSpan.FromSeconds(1));

...

async Task<string> func()
{
    await _rateLimiter.WaitAsync();

    var response = await client.SendAsync(mediaRequest);
    return await response.Content.ReadAsStringAsync();
}

Class

public class RateLimiter
{
   private readonly CancellationToken _cancellationToken;
   private readonly TimeSpan _timeSpan;
   private bool _isProcessing;
   private readonly int _count;
   private readonly Queue<DateTime> _completed = new Queue<DateTime>();
   private readonly Queue<TaskCompletionSource<bool>> _waiting = new Queue<TaskCompletionSource<bool>>();
   private readonly object _sync = new object();

   public RateLimiter(int count, TimeSpan timeSpan, CancellationToken cancellationToken = default)
   {
      _count = count;
      _timeSpan = timeSpan;
      _cancellationToken = cancellationToken;
   }

   private void Cleanup()
   {
      // if the cancellation  was request, we need to throw on all waiting items
      while (_cancellationToken.IsCancellationRequested && _waiting.Any())
         if (_waiting.TryDequeue(out var item))
            item.TrySetCanceled();

      _waiting.Clear();
      _completed.Clear();

      _isProcessing = false;
   }

   private async Task ProcessAsync()
   {
      try
      {
         while (true)
         {

            _cancellationToken.ThrowIfCancellationRequested();
            var time = DateTime.Now - _timeSpan;

            lock (_sync)
            {
               // remove anything out of date from the queue
               while (_completed.Any() && _completed.Peek() < time)
                  _completed.TryDequeue(out _);

               // signal waiting tasks to process
               while (_completed.Count < _count && _waiting.Any())
               {
                  if (_waiting.TryDequeue(out var item))
                     item.TrySetResult(true);
                  _completed.Enqueue(DateTime.Now);
               }

               if (!_waiting.Any() && !_completed.Any())
               {
                  Cleanup();
                  break;
               }
            }

            var delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);

            if (delay.Ticks > 0)
               await Task.Delay(delay, _cancellationToken);
            Console.WriteLine(delay);
         }
      }
      catch (OperationCanceledException)
      {
         lock (_sync)
            Cleanup();
      }
   }

   public ValueTask WaitAsync()
   {
      // ReSharper disable once InconsistentlySynchronizedField
      _cancellationToken.ThrowIfCancellationRequested();

      lock (_sync)
      {
         try
         {
            if (_completed.Count < _count && !_waiting.Any())
            {
               _completed.Enqueue(DateTime.Now);
               return new ValueTask();
            }

            var tcs = new TaskCompletionSource<bool>();
            _waiting.Enqueue(tcs);
            return new ValueTask(tcs.Task);
         }
         finally
         {
            if (!_isProcessing)
            {
               _isProcessing = true;
               _ = ProcessAsync();
            }
         }
      }
   }
}

Note 1 : It would be optimal to use this with a max degree of parallelism.

Note 2 : Although I tested this, I only wrote it for this answer as a novel solution.

Nanice answered 25/10, 2020 at 5:24 Comment(2)
After seeing the implementation, I wouldn't describe it as simple or fairly easy! πŸ˜ƒ – Olney
@TheodorZoulias yeah it was simpler! in my head, before i wrote it – Nanice
O
0

Just scanned quickly over the code and perhaps another similar solution would be to add a Thread.Sleep(calculatedDelay):

    foreach (var mediaItem in uploadedMedia.Items)
    {
        Thread.Sleep(calculatedDelay);
        var mediaRequest = new HttpRequestMessage

Where calculatedDelay is some value based on 1000/25.

However I feel you would need a better solution than putting in a delay of some specified value as you cannot be sure of overhead delays issues in transferring data. Also you dont indicate what happens when you reach the 25+ limit, how does the server respond.. do you get an error or is it handled more elegantly? Here is perhaps the area where you can find a more reliable solution?

Olympian answered 25/10, 2020 at 5:35 Comment(1)
Yes I agree a better solution is needed than adding a delay. The server responds with too many requests error – Takahashi
O
0

The Polly library currently lacks a rate-limiting policy (requests/time). Fortunately this functionality is relatively easy to implement using a SemaphoreSlim. The trick to make the rate-limiting happen is to configure the capacity of the semaphore equal to the dividend (requests), and delay the Release of the semaphore for a time span equal to the divisor (time), after acquiring the semaphore. This way the rate limit will be applied consistently to any possible time window.

Update: I realized that the Polly library is extensible, and allows to implement custom policies with custom functionality. So I'm scraping my original suggestion in favor of the custom RateLimitAsyncPolicy class below:

public class RateLimitAsyncPolicy : AsyncPolicy
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimitAsyncPolicy(int maxOperationsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _semaphore = new SemaphoreSlim(maxOperationsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    protected async override Task<TResult> ImplementationAsync<TResult>(
        Func<Context, CancellationToken, Task<TResult>> action,
        Context context,
        CancellationToken cancellationToken,
        bool continueOnCapturedContext)
    {
        await _semaphore.WaitAsync(cancellationToken)
            .ConfigureAwait(continueOnCapturedContext);
        ScheduleSemaphoreRelease();
        return await action(context, cancellationToken).ConfigureAwait(false);
    }

    private async void ScheduleSemaphoreRelease()
    {
        await Task.Delay(_timeUnit);
        _semaphore.Release();
    }
}

This policy ensures that no more than maxOperationsPerTimeUnit operations will be started during any time window of timeUnit size. The duration of the operations is not taken into account. In other words no restriction is imposed on how many operations can be running concurrently at any given moment. This restriction can be optionally imposed by the BulkheadAsync policy. Combining these two policies (the RateLimitAsyncPolicy and the BulkheadAsync) is possible, as shown in the example below:

var policy = Policy.WrapAsync
(
    Policy
        .Handle<HttpRequestException>()
        .RetryAsync(retryCount: 3),

    new RateLimitAsyncPolicy(
        maxOperationsPerTimeUnit: 25, timeUnit: TimeSpan.FromSeconds(1)),

    Policy.BulkheadAsync( // Optional
        maxParallelization: 25, maxQueuingActions: Int32.MaxValue)
);

The order is important only for the RetryAsync policy, that must be placed first for a reason explained in the documentation:

BulkheadPolicy: Usually innermost unless wraps a final TimeoutPolicy. Certainly inside any WaitAndRetry. The Bulkhead intentionally limits the parallelization. You want that parallelization devoted to running the delegate, not occupied by waits for a retry.

Similarly the RateLimitPolicy must follow the Retry, so that each retry to be considered an independent operation, and to count towards the rate limit.

Olney answered 25/10, 2020 at 6:29 Comment(1)
In retrospect the RateLimitAsyncPolicy class is a bit leaky, because it starts some internal asynchronous operations (the ScheduleDelaySemaphoreRelease method) that remain active for some time after the last use of the RateLimitAsyncPolicy. It's not a huge problem, but it can be an issue if you create multiple RateLimitAsyncPolicy instances, initialized with a long timeUnit timespan. A solution to this problem could be to make the class disposable, similarly with the RateLimiter class in this answer. – Olney
D
0

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

HttpRequestMessage MakeMessage(MediaItem mi) => new HttpRequestMessage
{
    RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mi.filename}"),
    Method = HttpMethod.Get,
    Headers = {
        { "id-token", id_Token },
        { "access-token", access_Token }
    }
};

var urls = await
    uploadedMedia
        .Items
        .ToObservable()
        .Buffer(24)
        .Zip(Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0)), (mrs, _) => mrs)
        .SelectMany(mrs => mrs.ToObservable().SelectMany(mr => Observable.FromAsync(() => client.SendAsync(MakeMessage(mr)))))
        .SelectMany(x => Observable.FromAsync(() => x.Content.ReadAsStringAsync()))
        .ToList();

I haven't been able to test it, but it should be fairly close.

Dirichlet answered 29/10, 2020 at 11:21 Comment(0)

© 2022 - 2024 β€” McMap. All rights reserved.