Task Parallel Library - Custom Task Schedulers
Asked Answered
F

4

5

I have a requirement to fire off web service requests to an online api and I thought that Parallel Extensions would be a good fit for my needs.

The web service in question is designed to be called repeatedly, but has a mechanism that charges you if you got over a certain number of calls per second. I obviously want to minimize my charges and so was wondering if anyone has seen a TaskScheduler that can cope with the following requirements:

  1. Limit the number of tasks scheduled per timespan. I guess if the number of requests exceeded this limit then it would need to throw away the task or possibly block? (to stop a back log of tasks)
  2. Detect if the same request is already in the scheduler to be executed but hasn't been yet and if so not queue the second task but return the first instead.

Do people feel that these are the sorts of responsibilities a task scheduler should be dealing with or am i barking up the wrong tree? If you have alternatives I am open to suggestions.

Fredericafrederich answered 20/3, 2012 at 21:54 Comment(3)
I think at least #2 is impossible to do with a TaskScheduler, because it deals with Tasks and there is no way to get that information out of them.Superintendent
Would you be interested in a solution that uses C# 5/.Net 4.5?Superintendent
@Superintendent absolutely, I am fortunate enough to be allowed to use c#5Fredericafrederich
S
8

I agree with others that TPL Dataflow sounds like a good solution for this.

To limit the processing, you could create a TransformBlock that doesn't actually transform the data in any way, it just delays it if it arrived too soon after the previous data:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

Then create a method that produces the data (for example integers starting from 0):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

It's written asynchronously, so that if the target block isn't able to process the items right now, it will wait.

Then write a consumer method:

static void Consumer(int i)
{
    Console.WriteLine(i);
}

And finally, link it all together and start it up:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

Here, delayBlock will accept at most one item every 500 ms and the Consumer() method can run multiple times in parallel. To finish processing, call delayBlock.Complete().

If you want to add some caching per your #2, you could create another TransformBlock do the work there and link it to the other blocks.

Superintendent answered 21/3, 2012 at 17:6 Comment(3)
Bingo, that's exactly what I had in mind. Good on ya for providing an actual implementation, I just haven't been able to find the time.Biogenesis
Neat. One thing to note, if you're running the async CTP and not .NET 4.5, you'll need to change Task.Delay to TaskEx.Delay.Mordy
@DPeden, I think you mean .Net 4.5 (currently in beta).Superintendent
B
3

Honestly I would work at a higher level of abstraction and use the TPL Dataflow API for this. The only catch is you would need to write a custom block that will throttle the requests at the rate at which you need because, by default, blocks are "greedy" and will just process as fast as possible. The implementation would be something like this:

  1. Start with a BufferBlock<T> which is the logical block that you would post to.
  2. Link the BufferBlock<T> to a custom block which has the knowledge of requests/sec and throttling logic.
  3. Link the custom block from 2 to to your ActionBlock<T>.

I don't have the time to write the custom block for #2 right this second, but I will check back later and try to fill in an implementation for you if you haven't already figured it out.

Biogenesis answered 20/3, 2012 at 22:31 Comment(1)
As I havent looked into TPL Dataflow at all, I would really appreciate your helpFredericafrederich
S
2

I haven't used RX much, but AFAICT the Observable.Window method would work fine for this.

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window(VS.103).aspx

It would seem to be a better fit than Throttle which seems to throw elements away, which I'm guessing is not what you want

Stringhalt answered 21/3, 2012 at 3:41 Comment(0)
M
0

If you need to throttle by time, you should check out Quartz.net. It can facilitate consistent polling. If you care about all requests, you should consider using some sort of queueing mechanism. MSMQ is probably the right solution but there are many specific implementations if you want to go bigger and use an ESB like NServiceBus or RabbitMQ.

Update:

In that case, TPL Dataflow is your preferred solution if you can leverage the CTP. A throttled BufferBlock is the solution.

This example comes from the documentation provided by Microsoft:

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await m_buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await m_buffer.ReceiveAsync());
    }
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Update:

Check out RX's Observable.Throttle.

Mordy answered 20/3, 2012 at 22:3 Comment(4)
I wanted to keep everything in process so have ruled out queueing as a solution. I dont think Quartz is the right solution either, the scheduling is not the issue, its throttling the number of requests and being a bit smart about the calls that I make. Thanks for the suggestions thoughFredericafrederich
@DarenFox How exactly does this prevent more than N requests per second? Looks like you're only limiting to 10 concurrent outstanding calls, but obviously if the calls are faster than 10ms each that could exceed the per second restriction. FWIW, I was answering that TPL dataflow is probably the best way to go too, but you technically need a temporal BufferBlock implementation.Biogenesis
@DarenFox This was my concern as well but it seemed to be discarded based upon DarrenFox's last comment. Nonetheless, I think RX may be the solution. I updated my answer again with another option.Mordy
From what I understand of RX Throttle wouldn't help as it is a selector on the observable. This means that requests still need to be made to the web service but are just not returned in the observable until there have been no more requests in the time period. I actually have a partial implementation of my requirement working using Observable.Timer but it is growing clunky which is why I have taken the step back to evaluate other optionsFredericafrederich

© 2022 - 2024 — McMap. All rights reserved.