Partition: How to add a wait after every partition
Asked Answered
M

1

1

I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner to partition my list into 20 items/requests but soon I realized the Partitioner does not work like that, my 2nd thought was adding a delay in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition. Below is my code:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

Does anyone know how I can accomplish this?

Misdeed answered 21/1, 2021 at 10:41 Comment(8)
Related: How to execute tasks in parallel but not more than N tasks per T seconds?Contagious
Does the duration of the requests counts towards the requests-per-minute policy? In other words are you allowed to start 20 requests per minute (independently of their duration), or you must wait for a minute after the completion of the 20 previous requests?Contagious
@TheodorZoulias no it does not, what matters is; 20 calls per minute.Misdeed
Related: Add delay to parallel API call (Polly)Contagious
As a side note, be aware that your current implementation of ForEachAsync (which is probably a modified version of the last ForEachAsync in this article), handles exceptions in a non-ideal way. The reasons are explained in the comments of this answer.Contagious
@TheodorZoulias I have eliminated the ForEachAsync, the RateLimiter in your previous comment seems to work, I am currently testing it and will get back.Misdeed
You mean the RateLimiter class from this answer? That's a fairly complicated piece of code. It's beyond my capabilities to review it and confirm its correctness.Contagious
Another related question: Simple way to rate limit HttpClient requestsContagious
C
9

Here is a RateLimiter class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter class that is found in this answer.

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        // Schedule the release of the semaphore using a Timer.
        // Use the newly created Timer object as the state object, to prevent GC.
        // Handle the unlikely case that the _timeUnit is invalid.
        System.Threading.Timer timer = new(_ => _semaphore.Release());
        try { timer.Change(_timeUnit, Timeout.InfiniteTimeSpan); }
        catch { _semaphore.Release(); throw; }
    }
}

Usage example:

List<string> urls = GetUrls();

RateLimiter rateLimiter = new(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

Online demo.

The Timer is constructed with this specific constructor to prevent it from being garbage collected until it fires, as explained in this answer by Nick H.

Note: This implementation is slightly leaky in the sense that it creates internally disposable System.Threading.Timer objects, that are not disposed when you are finished using the RateLimiter. Any active timers will prevent the RateLimiter from being garbage collected until these timers have fired their callback. Also the SemaphoreSlim is not disposed as it should. These are minor flaws, that are unlikely to affect a program that creates only a handful of RateLimiters. In case you intend to create a lot of them, you could take a look at the 3rd revision of this answer, that features a disposable RateLimiter based on the Task.Delay method.


Here is an alternative implementation of the RateLimiter class, more complex, which is based on the Environment.TickCount64 property instead of a SemaphoreSlim. It has the advantage that it doesn't create fire-and-forget timers in the background. The disadvantages are that the WaitAsync method does not support a CancellationToken argument, and that the probability of bugs is higher because of the complexity.

public class RateLimiter
{
    private readonly Queue<long> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly int _timeUnitMilliseconds;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _queue = new Queue<long>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
    }

    public Task WaitAsync()
    {
        int delayMilliseconds = 0;
        lock (_queue)
        {
            long currentTimestamp = Environment.TickCount64;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                long refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
                Debug.Assert(delayMilliseconds >= 0);
                if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delayMilliseconds
                + _timeUnitMilliseconds);
        }
        if (delayMilliseconds == 0) return Task.CompletedTask;
        return Task.Delay(delayMilliseconds);
    }
}
Contagious answered 21/1, 2021 at 14:53 Comment(7)
This works amazingly well and simplifies my work, no need for partitioning. Thank you.Misdeed
I don't know If i have done it wrongly or something but when i am using this RateLimiter component, I was using using var rateLimiter = new RateLimiter(10, TimeSpan.FromMinutes(3));, it took me almost 30 min to process just 10 request? What could be wrong on my implementation?Templas
@Templas my guess is that if you comment out the await rateLimiter.WaitAsync();, it will still take 30 minutes for 10 requests. Could you test it and see what happens?Contagious
it take just few seconds to complete 10 requests but most will return with empty responses due to rate limit from the api i am calling.Templas
@Templas then my guess is that the average duration of a valid operation (one that is not rejected by the remote server) is 3 minutes. Otherwise there is some bug in the RateLimiter that so far has been undetected. Please see if the first version of this component works as expected.Contagious
why doesn't it inherit IDisposable? using should inherit Idisposable right?Templas
@Templas the using was a mistake. I just fixed it. It was a remnant from the 3rd revision of this answer, which featured a disposable RateLimiter.Contagious

© 2022 - 2024 — McMap. All rights reserved.