Queuing asynchronous task in C#
Asked Answered
E

5

6

I have few methods that report some data to Data base. We want to invoke all calls to Data service asynchronously. These calls to data service are all over and so we want to make sure that these DS calls are executed one after another in order at any given time. Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' . Will this be appropriate in my use case? Or what other options will suit my use case? Please, guide me.

So, what i have in my code currently

public static SemaphoreSlim mutex = new SemaphoreSlim(1);

//first DS call 

 public async Task SendModuleDataToDSAsync(Module parameters)
    {
        var tasks1 = new List<Task>();
        var tasks2 = new List<Task>();

        //await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
        foreach (var setting in Module.param)
        {
           Task job1 = SaveModule(setting);
           tasks1.Add(job1);
           Task job2= SaveModule(GetAdvancedData(setting));
           tasks2.Add(job2);
        }

        await Task.WhenAll(tasks1);
        await Task.WhenAll(tasks2);

        //mutex.Release(); // **is this correct?**
    }

 private async Task SaveModule(Module setting)
    {
        await Task.Run(() =>
            {
             // Invokes Calls to DS
             ... 
            });
    }

//somewhere down the main thread, invoking second call to DS

  //Second DS Call
 private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
 {
    //await mutex.WaitAsync();// **is this correct?**
    await Task.Run(() =>
            {
                 //TrackInstrumentInfoToDS
                 //mutex.Release();// **is this correct?**
            });
    if(param2)
    {
        await Task.Run(() =>
               {
                  //TrackParam2InstrumentInfoToDS
               });
    }
 }

enter image description here

enter image description here

Electrodeposit answered 2/4, 2020 at 4:12 Comment(6)
This post explanes why you can't use lock statement with await. A lock statement is only syntactic sugar for a mutex. You have to use a SemaphoreSlim for async locks. If you use Nito.AsyncEx the usage is simple using var lockHandle = await this._lock.LockAsync().Falange
When you say that you want to invoke the calls to the Data Service asynchronously, do you mean in fire-and-forget fashion? Also is you application of type ASP.NET by any chance? In case both are true, you should take a look at this.Roesch
Does the first DS call need to be always before the second DS call? And is there any chance they run simultaneously in an uncertain order at the moment? And how many other DS calls you have?Stereotomy
@Stereotomy : Yes, i want it as fire and forget. not really awaiting any response. we are using Dot net. And yeas, i need the first DS calls to be executed before the 2nd one. The first DS call is more like making the pre-requisite ready for the 2nd one. So far, we just have 2 DS calls that i want to track asynchronously.Electrodeposit
As a side note, be aware that the HttpClient is intended to be instantiated once and re-used throughout the life of an application. Instantiating an HttpClient class for every request will exhaust the number of sockets available under heavy loads. (citation)Roesch
@TheodorZoulias: yes i understand that. this is only an example i'm using to help me use BlockingCollection in async. thanksElectrodeposit
R
11

Initially, i was using async await on each of these methods and each of the calls were executed asynchronously but we found out if they are out of sequence then there are room for errors.

So, i thought we should queue all these asynchronous tasks and send them in a separate thread but i want to know what options we have? I came across 'SemaphoreSlim' .

SemaphoreSlim does restrict asynchronous code to running one at a time, and is a valid form of mutual exclusion. However, since "out of sequence" calls can cause errors, then SemaphoreSlim is not an appropriate solution since it does not guarantee FIFO.

In a more general sense, no synchronization primitive guarantees FIFO because that can cause problems due to side effects like lock convoys. On the other hand, it is natural for data structures to be strictly FIFO.

So, you'll need to use your own FIFO queue, rather than having an implicit execution queue. Channels is a nice, performant, async-compatible queue, but since you're on an older version of C#/.NET, BlockingCollection<T> would work:

public sealed class ExecutionQueue
{
  private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();

  public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

The only tricky part with this setup is how to queue work. From the perspective of the code queueing the work, they want to know when the lambda is executed, not when the lambda is queued. From the perspective of the queue method (which I'm calling Run), the method needs to complete its returned task only after the lambda is executed. So, you can write the queue method something like this:

public Task Run(Func<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(async () =>
  {
    // Execute the lambda and propagate the results to the Task returned from Run
    try
    {
      await lambda();
      tcs.TrySetResult(null);
    }
    catch (OperationCanceledException ex)
    {
      tcs.TrySetCanceled(ex.CancellationToken);
    }
    catch (Exception ex)
    {
      tcs.TrySetException(ex);
    }
  });
  return tcs.Task;
}

This queueing method isn't as perfect as it could be. If a task completes with more than one exception (this is normal for parallel code), only the first one is retained (this is normal for async code). There's also an edge case around OperationCanceledException handling. But this code is good enough for most cases.

Now you can use it like this:

public static ExecutionQueue _queue = new ExecutionQueue();

public async Task SendModuleDataToDSAsync(Module parameters)
{
  var tasks1 = new List<Task>();
  var tasks2 = new List<Task>();

  foreach (var setting in Module.param)
  {
    Task job1 = _queue.Run(() => SaveModule(setting));
    tasks1.Add(job1);
    Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
    tasks2.Add(job2);
  }

  await Task.WhenAll(tasks1);
  await Task.WhenAll(tasks2);
}
Rotz answered 2/4, 2020 at 13:33 Comment(21)
@Stephen Cleary - is IAsyncDisposable C# 8 feature? Can i just use the Run method withing ExecutionQueue class ? I don't see the use of ProcessQueueAsync so can i assume, we can only use Run method inside of Execution queue? Also, since we are returning the results after running lambda, its assured that tasks executed through Run are in order ?Electrodeposit
Also, when Task job1 = _queue.Run(() => SaveModule(setting)); does the job gets executed before adding it to tasks1 ? So having 'tasks1.Add(job1)' is of no use ?Electrodeposit
@Stephen Cleary : We are using C# 7 and i couldn't get this to work. I'm stuck.Electrodeposit
@jamilia: I've updated the code to use older tech. Put the Run method in the ExecutionQueue class. ProcessQueueAsync is called from the constructor. The tasks will be run in order; this is guaranteed by the explicit queue. When calling Run, the job may begin executing at any time; adding it to tasks1 is still valid because you need to await the result regardless of whether it completed already or in the future.Rotz
@StephenCleary: thanks. I see that GetConsumingEnumerable blocks while there are no items in the queue. initially when creating instance of ExecutionQueue, the constructor gets invoked and is blocked as soon as ProcessQueueAsync is invoked. I need to be able to add items to queue only after instantiating the ExecutionQueue class but i'm being blocked. thoughts pls?Electrodeposit
@jamilia: Fixed with Task.Run.Rotz
Thanks again. I don't know what mistake i'm doing but i'm unable to get them working. tcs.Task has status 'WaitingForActivation'. At some point, it does get to 'await lambda' but it doesn't execute the method that was added to the queue.Electrodeposit
I added in the image of the code snippet i was trying where it din't work out. Please, let me know what i am missing..Electrodeposit
@Electrodeposit you are not awaiting the someTask inside the Main method. If you are using C# 7.1 or later you can have an async Main method. Otherwise instead of await someTask do someTask.GetAwaiter().GetResult().Roesch
@StephenCleary: I thought as soon as queue starts filling, it should activate but it still in WaitingForActivation. Also, when should _queue.CompleteAdding();be invoked to close the thread?Electrodeposit
@jamilia: WaitingForActivation is completely normal. You can invoke Complete whenever you're done adding work to the queue.Rotz
@StephenCleary: my code is working randomly. most of the time, only the first method in the queue is executed and it exits. Sometimes, 2 methods are executed when i did not do any code change. At any given time, all the tasks that was queued should run with this implementation, correct? What am i missing here?Electrodeposit
Is your application exiting before the queue completes?Rotz
@Stephen Cleary: yes, seems the application exits before queue completes. I’m trying this on a sample test application and will apply this in my project once I find them working. So, even if my application exits, the queue should finish processing all the task in it, correct ? why would it not run rest of the items ??Electrodeposit
No; if your application exits, then all threads are terminated. You'll need to keep your application open until the queue completes.Rotz
@StephenCleary: Got you! One more thing, i run this in a wcf service and i have two different class that makes calls to DS which is added into the static queue. Now, setting up CompleteAdding is a trouble now as after my activity finishes but since the service is still up, our activity can be invoked any number of times thus i should allow to add to queue for every new run. So a new instance everytime we run would be apt but i'm afraid i'm unable to make them as non-static queue as i have two different class and i want to use the same queue for both DS calls. any advise on this, please?Electrodeposit
@jamilia: You'll need to keep track of how many different activities are currently using it, and block the WCF shutdown until that number reaches zero. I have no idea how to do this on WCF.Rotz
@StephenCleary: Thanks. i wont be able to track that because as long as service is up any number of activities can be run at any time., but how important it is to call CompleteAdding ? I may have this consumer listening on one thread forever until my service is down, it could be days. And will there be any impact on queue that waits on item to be added for that long?Electrodeposit
You only need to call CompleteAdding if you need to wait for the queue to complete. The queue, once created, will happily wait any amount of time for items to be added.Rotz
@StephenCleary: Ok thanks. I believe in my use case, i may skip calling completeAdding as my queue as to accept items as long as service is up. Thanks so much for your complete guidance. You have been such a great help!Electrodeposit
At the first block of code - the ExecutionQueue class - in the 3rd line: public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync()); Shouldn't it be Completion instead of Complete?@StephenClearyTaeniafuge
P
2

Here's a compact solution that has the least amount of moving parts but still guarantees FIFO ordering (unlike some of the suggested SemaphoreSlim solutions). There are two overloads for Enqueue so you can enqueue tasks with and without return values.

using System;
using System.Threading;
using System.Threading.Tasks;

public class TaskQueue
{
    private Task _previousTask = Task.CompletedTask;

    public Task Enqueue(Func<Task> asyncAction)
    {
        return Enqueue(async () => { 
            await asyncAction().ConfigureAwait(false);
            return true; 
        });
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> asyncFunction)
    {
        var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
        // get predecessor and wait until it's done. Also atomically swap in our own completion task.
        await Interlocked.Exchange(ref _previousTask, tcs.Task).ConfigureAwait(false);
        try
        { 
            return await asyncFunction().ConfigureAwait(false); 
        }
        finally 
        { 
            tcs.SetResult(); 
        }
    }
}
Phlebitis answered 9/1, 2023 at 14:44 Comment(0)
F
1

Please keep in mind that your first solution queueing all tasks to lists doesn't ensure that the tasks are executed one after another. They're all running in parallel because they're not awaited until the next tasks is startet.

So yes you've to use a SemapohoreSlim to use async locking and await. A simple implementation might be:

private readonly SemaphoreSlim _syncRoot = new SemaphoreSlim(1);

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await this._syncRoot.WaitAsync();
    try
    {
        foreach (var setting in Module.param)
        {
           await SaveModule(setting);
           await SaveModule(GetAdvancedData(setting));
        }
    }
    finally
    {
        this._syncRoot.Release();
    }
}

If you can use Nito.AsyncEx the code can be simplified to:

public async Task SendModuleDataToDSAsync(Module parameters)
{
    using var lockHandle = await this._syncRoot.LockAsync();

    foreach (var setting in Module.param)
    {
       await SaveModule(setting);
       await SaveModule(GetAdvancedData(setting));
    }
}
Falange answered 2/4, 2020 at 4:40 Comment(0)
S
0

One option is to queue operations that will create tasks instead of queuing already running tasks as the code in the question does.

PseudoCode without locking:

 Queue<Func<Task>> tasksQueue = new Queue<Func<Task>>();

 async Task RunAllTasks()
 {
      while (tasksQueue.Count > 0)
      { 
           var taskCreator = tasksQueue.Dequeu(); // get creator 
           var task = taskCreator(); // staring one task at a time here
           await task; // wait till task completes
      }
  }

  // note that declaring createSaveModuleTask does not  
  // start SaveModule task - it will only happen after this func is invoked
  // inside RunAllTasks
  Func<Task> createSaveModuleTask = () => SaveModule(setting);

  tasksQueue.Add(createSaveModuleTask);
  tasksQueue.Add(() => SaveModule(GetAdvancedData(setting)));
  // no DB operations started at this point

  // this will start tasks from the queue one by one.
  await RunAllTasks();

Using ConcurrentQueue would be likely be right thing in actual code. You also would need to know total number of expected operations to stop when all are started and awaited one after another.

Stradivari answered 2/4, 2020 at 4:59 Comment(6)
I don't see where exactly the queue-adding and the await RunAll.. is located. Assuming that tasksQueue is a member of the class and the code after } is in the called method SendModuleDataToDSAsync, how does it ensure that all tasks are running one after another? Even if you're using a ConcurrentQueue. If you call SendModuleDataToDSAsync twice the last line await RunAll.. is called twice in parallel. Maybe a BlockingCollection and a totally separat Task will help. But maybe I don't see the point and your solution works correctly.Falange
@SebastianSchumann The main point of the example is to show how to queue not yet started tasks. It's really not a complete solution. I assumed (probably incorrectly) that OP wanted to serialize all operations - so having one queue with one consumer (triggered by single call to RunAllTasks) would guarantee that all operations are executed in order (by using queue) and sequentially (by using single runner)...Stradivari
Thank you for the suggestions. the code inside the for loop where i'm adding the tasks to list, i'm ok if they are executed in parallel. But my concern was queuing up any call to DS. so, in this case, SendModuleDataToDSAsync and SendInstrumentSettingsToDS must be executed in order asynchronously. So, i think queue would work for that. i'm wondering when and where should i invoke "await RunAllTasks()". Should i call this once all the DS calls are added into the queue ? Also, is a lock(like SemaphoreSlim ) would be necessary to use?Electrodeposit
@AlexeiLevenkov - I'm trying to implement queue and i'm stuck on when to invoke await RunAllTasks();. As i mentioned, once i invoke SendModuleDataToDSAsync somewhere down the code i need to invoke SendInstrumentSettingsToDS. So, if i have to cal RunAllTasks after the 2nd DS calls, then the first DS code which could have run at the beginning is now made to wait until all DS calls are added to the queue. On the other hand, if i invoke 'await RunAllTasks()' after each DS calls added to queue then i'm creating separate threads for each await then my purpose isn't solved. Need advise pls!Electrodeposit
@Electrodeposit the code in the post is just to illustrate how you can put not-yet-started tasks into queue and start them later in a sequence. I'm still not really sure if that is what you want, but if you really looking for such sequential execution you need to have single consumer that will do equivalent of RunAllTask method pulling items from thread safe, preferably async queue like https://mcmap.net/q/329165/-awaitable-task-based-queue. But since you have real answer from Stephen Cleary with complete code I'm not sure why you even playing with this (his answer uses the same Func<Task> ...)Stradivari
@AlexeiLevenkov : Looks like Stephen Cleary code needs to be in C# 8 and even your suggestion at the link is for C#8. Hence, i'm trying to work this in C#7 and looking for guidance.Electrodeposit
A
0

Building on your comment under Alexeis answer, your approch with the SemaphoreSlim is correct.

Assumeing that the methods SendInstrumentSettingsToDS and SendModuleDataToDSAsync are members of the same class. You simplay need a instance variable for a SemaphoreSlim and then at the start of each methode that needs synchornization call await lock.WaitAsync() and call lock.Release() in the finally block.

public async Task SendModuleDataToDSAsync(Module parameters)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
{
    await lock.WaitAsync();
    try
    {
        ...
    }
    finally
    {
        lock.Release();
    }
}

and it is importend that the call to lock.Release() is in the finally-block, so that if an exception is thrown somewhere in the code of the try-block the semaphore is released.

Affiliate answered 2/4, 2020 at 8:41 Comment(1)
You can enforce the Release()-call with using clause and Nito.AsyncEx.Falange

© 2022 - 2025 — McMap. All rights reserved.