What is the best way to wait on a network packet using C#'s new async feature
Asked Answered
C

3

7

I've recently been playing around with the new Async CTP, and I've come across a situation where I'm not sure how to proceed.

In my current code base, I'm using a concept of "jobs" and a "job manager". Jobs exist solely for the purpose of handling an initial message, sending a response, and then waiting the response.

I already have existing code based around synchronous sockets, where a network thread is waiting on data to arrive, and then passing it along to an event handler, and eventually to the job manager.

The job manager looks for what job would handle the message, and passes it along.

So the scenario is this:

  1. Job manager gets a new message and launches a job.
  2. The job starts, processes the message, and sends a reply message.
  3. At this point the job would wait for a response to the reply.

Here's a pseudocode example:

class MyJob : Job
{
    public override void RunJob( IPacketMsg packet )
    {
        // handle packet

        var myReply = new Packet();
        SendReply( myReply );

        await GetResponse();
    }
}

But I'm not entirely sure how to proceed at step 3. The job manager will get the response and then hand it along to the running job. But I'm not sure how to make the job wait for the response.

I've considered creating an awaited Task that simply blocks on a WaitHandle, but is this the best solution?

Are there any other things I could do in this case?

Edit On the subject of the Async CTP, what happens in a situation where the UI is not being used. I've read over Eric Lippert's Async blog, but I don't believe it ever touched on the subject of how everything works in the background without a UI thread (does it spin off a background worker or...?)

Coaxial answered 12/9, 2011 at 19:54 Comment(5)
So you want this thread to block while waiting for GetResponse? If so, then await isnt the pattern you need to be using. You would just do GetResponse().Wait().Valdavaldas
@Tejs: No, I don't want the network thread to block, I just want this job's function to block while waiting for a response. Other jobs need to be able to start while this job is waiting for it's response. Essentially I'm looking for something coroutine-like with the ability to yield this job's execution while the response hasn't arrived.Coaxial
Give me a use case here - you want to execute RunJob, which will start SendReply, and you want the body of GetResponse to wait until SendReply finishes, but without blocking the thread?Valdavaldas
SendReply will finish synchronously. When it finishes I assume the remote party I'm communicating with has recieved my reply and is processing it. I want GetResponse to block (but without blocking the whole network thread) until the the remote end has sent me a response to my reply.Coaxial
I did not touch on that, true. But I happen to know that October's MSDN magazine will have an article on how the asynchronous context works behind the scenes, so watch out for that.Abad
O
5
  1. Job manager gets a new message and launches a job.
  2. The job starts, processes the message, and sends a reply message.
  3. At this point the job would wait for a response to the reply.

First off, I should mention that the Async CTP handles asynchronous operations very well, but asynchronous events not so much. You may want to consider an Rx-based approach. But let's proceed for the moment with the Async CTP.

You have two basic options to create Tasks:

  • With a delegate. e.g., Task.Factory.StartNew will run a delegate on the thread pool. Custom task factories and schedulers give you more options for task delegates (e.g., specifying the delegate must be run on an STA thread).
  • Without a delegate. e.g., TaskFactory.FromAsync wraps an existing Begin/End method pair, TaskEx.FromResult returns a "future constant", and TaskCompletionSource can be used to control a Task explicitly (both FromAsync and FromResult use TCS internally).

If the job processing is CPU-bound, it makes sense to pass it off to Task.Factory.StartNew. I'm going to assume the job processing is CPU-bound.

Job manager pseudo-code:

// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
  IJob job = ..;
  Task.Factory.StartNew(job.RunJob(message));
}

// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;

// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
  var tcs = new TaskCompletionSource<IResponse>();
  responseTasks.Add(replyId, tcs);
  return tcs.Task;
}

// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
  var tcs = responseTasks[response.ReplyId];
  responseTasks.Remove(response.ReplyId);
  tcs.TrySetComplete(response);
}

The idea is that the job manager also manages a list of oustanding responses. In order for this to happen, I introduced a simple int reply identifier that the job manager can use to determine which response goes with which reply.

Now jobs can work like this:

public override void RunJob(IPacketMsg packet)
{
  // handle packet
  var myReply = new Packet();
  var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
  SendReply(myReply);

  await response;
}

There's a few tricky things since we're placing the jobs on the thread pool thread:

  1. GetResponseForReplyAsync must be invoked (registering the task) before the reply is sent, and is then awaited later. This is to avoid the situation where a reply may be sent and a response received before we have a chance to register for it.
  2. RespondToResponse will remove the task registration before completing it, just in case completing the task causes another reply to be sent with the same id.

If the jobs are short enough that they don't need to be placed on the thread pool thread, then the solution can be simplified.

Orson answered 12/9, 2011 at 21:2 Comment(2)
Thanks for the thorough response. However your example code brought up something else that I wondered about. I could refactor my code to make the whole job execute in a Task, but I'm worried that I may exhaust the threadpool when many messages come in at one time. Additionally, not every job will have to wait for a response, some will just handle the message that arrives and then finish. Having these types of jobs finish synchronously is acceptable, as no heavy processing will occur. The ones that require a response will have their time spent mostly waiting for the response.Coaxial
Remember that not every Task runs on the threadpool - in fact, some don't even have a delegate. Instead of passing RunJob to StartNew, you could have a virtual Task RunJobAsync() method in your base class. Derived classes could implement it either by calling StartNew (using the threadpool), or by an async method (asynchronous, using the threadpool only for continuations), or by calling TaskEx.FromResult (synchronous). This last approach is called the fast path.Orson
O
3

On the subject of the Async CTP, what happens in a situation where the UI is not being used. I've read over Eric Lippert's Async blog, but I don't believe it ever touched on the subject of how everything works in the background without a UI thread (does it spin off a background worker or...?)

await will return to its synchronization context. In a UI process, this is a UI message loop. In ASP.NET, this is the ASP.NET thread pool. In other situations (Console applications and Win32 services), there is no context, so continuations are queued to the ThreadPool. This is not usually desired behavior, so I wrote an AsyncContext class that can be used in those situations.

BackgroundWorker is not used. In a server-side scenario such as yours, it's not uncommon to not have a background thread at all.

Orson answered 12/9, 2011 at 20:17 Comment(0)
V
1

You would simply wire up the rest of your event handler with the await pattern like so:

 public async void RunJob(IPacketMsg msg)
 {
     // Do Stuff

     var response = await GetResponse();

     // response is "string", not "Task<string>"

     // Do More Stuff
 }

 public Task<string> GetResponse()
 {
     return Task.Factory.StartNew(() =>
        {
             _networkThingy.WaitForDataAvailable();

             return _networkThingy.ResponseString;
        });
 }

When your get response task finishes, the rest of the method picks up execution on your current synchronization context. Until then, however, your method execution is yielded (so any code after the wait is not run until the task started in GetResponse finishes)

Valdavaldas answered 12/9, 2011 at 20:24 Comment(1)
Essentially this boils down to having the task block for the data. In my case it would be a WaitHandle member in the job that the job manager would set to tell the job that the packet arrived. Is that optimal? Or should I be using something like Monitor.Wait/Pulse, or are those two essentially the same?Coaxial

© 2022 - 2024 — McMap. All rights reserved.