Network Command Processing with TPL Dataflow
Asked Answered
M

2

8

I'm working on a system that involves accepting commands over a TCP network connection, then sending responses upon execution of those commands. Fairly basic stuff, but I'm looking to support a few requirements:

  1. Multiple clients can connect at the same time and establish separate sessions. Sessions can last as long or as short as desired, with the same client IP able to establish multiple parallel sessions, if desired.
  2. Each session can process multiple commands at the same time, as some of the requested operations can be performed in parallel.

I'd like to implement this cleanly using async/await and, based on what I've read, TPL Dataflow sounds like a good way to cleanly break up the processing into nice chunks that can run on the thread pool instead of tying up threads for different sessions/commands, blocking on wait handles.

This is what I'm starting with (some parts stripped out to simplify, such as details of exception handling; I've also omitted a wrapper that provides an efficient awaitable for the network I/O):

    private readonly Task _serviceTask;
    private readonly Task _commandsTask;
    private readonly CancellationTokenSource _cancellation;
    private readonly BufferBlock<Command> _pendingCommands;

    public NetworkService(ICommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
        IsRunning = true;
        _cancellation = new CancellationTokenSource();
        _pendingCommands = new BufferBlock<Command>();
        _serviceTask = Task.Run((Func<Task>)RunService);
        _commandsTask = Task.Run((Func<Task>)RunCommands);
    }

    public bool IsRunning { get; private set; }

    private async Task RunService()
    {
        _listener = new TcpListener(IPAddress.Any, ServicePort);
        _listener.Start();

        while (IsRunning)
        {
            Socket client = null;
            try
            {
                client = await _listener.AcceptSocketAsync();
                client.Blocking = false;

                var session = RunSession(client);
                lock (_sessions)
                {
                    _sessions.Add(session);
                }
            }
            catch (Exception ex)
            { //Handling here...
            }
        }
    }

    private async Task RunCommands()
    {
        while (IsRunning)
        {
            var command = await _pendingCommands.ReceiveAsync(_cancellation.Token);
            var task = Task.Run(() => RunCommand(command));
        }
    }

    private async Task RunCommand(Command command)
    {
        try
        {
            var response = await _commandProcessor.RunCommand(command.Content);
            Send(command.Client, response);
        }
        catch (Exception ex)
        {
            //Deal with general command exceptions here...
        }
    }

    private async Task RunSession(Socket client)
    {
        while (client.Connected)
        {
            var reader = new DelimitedCommandReader(client);

            try
            {
                var content = await reader.ReceiveCommand();
                _pendingCommands.Post(new Command(client, content));
            }
            catch (Exception ex)
            {
                //Exception handling here...
            }
        }
    }

The basics seem straightforward, but one part is tripping me up: how do I make sure that when I'm shutting down the application, I wait for all pending command tasks to complete? I get the Task object when I use Task.Run to execute the command, but how do I keep track of pending commands so that I can make sure that all of them are complete before allowing the service to shut down?

I've considered using a simple List, with removal of commands from the List as they finish, but I'm wondering if I'm missing some basic tools in TPL Dataflow that would allow me to accomplish this more cleanly.


EDIT:

Reading more about TPL Dataflow, I'm wondering if what I should be using is a TransformBlock with an increased MaxDegreeOfParallelism to allow processing parallel commands? This sets an upper limit on the number of commands that can run in parallel, but that's a sensible limitation for my system, I think. I'm curious to hear from those who have experience with TPL Dataflow to know if I'm on the right track.

Maestro answered 31/1, 2014 at 16:32 Comment(5)
I strongly recommend you use a self-hosted WebAPI instead of a TCP/IP server, unless you're talking to embedded machines that don't have an HTTP client component. Bare TCP/IP has tons of pitfalls... Socket.Connected cannot be used the way you're trying to use it here. You need to send periodic data even if you never receive commands to avoid the half-open problem. Shutdown is always difficult to get right. In short, it's just really hard to write a correct TCP/IP server.Cinchonism
@StephenCleary, I'm integrating with a robot that only natively supports raw TCP protocol. I agree that there are challenges with this and this snippet doesn't include all the Exception handling for various disconnect cases that occur, as well as session timeout handling.Maestro
What are you using Socket.Blocking = false for? I have not seen a good use for non-blocking sockets in a long time. You are using async IO after all which takes case of the scalability issue completely.Choreographer
@usr, actually, you're right, I don't need that Blocking = false call, since I'm using ReceiveAsync on the Socket, which ignores the Blocking property. That was a leftover from how I had originally implemented my Socket operations prior to trying a cleaner, async approach.Maestro
@DanBryant ah I see. I was not even familiar with that behavior. It reminds me of what was considered modern socket programming on unix 15 years ago. Obsolete stuff.Choreographer
R
5

Yeah, so... you're kinda half using the power of TPL here. The fact that you're still manually receiving items from the BufferBlock in your own while loop in a background Task is not the "way" you want to do it if you're subscribing to the TPL DataFlow style.

What you would do is link an ActionBlock to the BufferBlock and do your command processing/sending from within that. This is also the block where you would set the MaxDegreeOfParallelism to control just how many concurrent commands you want to process. So that setup might look something like this:

// Initialization logic to build up the TPL flow
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new ActionBlock<Command>(this.ProcessCommand);

_pendingCommands.LinkTo(_commandProcessor);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);
   this.Send(command.Client, response);
}

Then, in your shutdown code, you would need to signal that you're done adding items into the pipeline by calling Complete on the _pipelineCommands BufferBlock and then wait on the _commandProcessor ActionBlock to complete to ensure that all items have made their way through the pipeline. You do this by grabbing the Task returned by the block's Completion property and calling Wait on it:

_pendingCommands.Complete();
_commandProcessor.Completion.Wait();

If you want to go for bonus points, you can even separate the command processing from the command sending. This would allow you to configure those steps separately from one another. For example, maybe you need to limit the number of threads processing commands, but want to have more sending out the responses. You would do this by simply introducing a TransformBlock into the middle of the flow:

_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new TransformBlock<Command, Tuple<Client, Response>>(this.ProcessCommand);
_commandSender = new ActionBlock<Tuple<Client, Response>(this.SendResponseToClient));

_pendingCommands.LinkTo(_commandProcessor);
_commandProcessor.LinkTo(_commandSender);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);

   return Tuple.Create(command, response);
}

private Task SendResponseToClient(Tuple<Client, Response> clientAndResponse)
{
   this.Send(clientAndResponse.Item1, clientAndResponse.Item2);
}

You probably want to use your own data structure instead of Tuple, it was just for illustrative purposes, but the point is this is exactly the kind of structure you want to use to break up the pipeline so that you can control the various aspects of it exactly how you might need to.

Redfaced answered 31/1, 2014 at 22:21 Comment(1)
This is very helpful; I'm quite new to DataFlow, but it looks to be quite useful. It's surprising how awkward network I/O can still be, even in the era of async/await. I went ahead and simplified it to a single ActionBlock (since it already has buffering) and preliminary unit tests look promising.Maestro
T
0

Tasks are by default background, which means that when application terminates they are also immediately terminated. You should use a Thread not a Task. Then you can set:

Thread.IsBackground = false;

This will prevent your application from terminating while the worker thread is running. Although of course this will require some changes in your above code.

What's more you, when executing the shutdown method, you could also just wait for any outstanding tasks from the main thread.

I do not see a better solution to this.

Tiffanitiffanie answered 31/1, 2014 at 16:35 Comment(2)
This unfortunately defeats the purpose of what I'm trying to accomplish, which is to allow efficient partitioning of tasks onto existing thread pool threads, rather than creating new threads as commands arrive.Maestro
Also, this is a process-global solution. What if he has multiple independent NetworkService's running?Choreographer

© 2022 - 2024 — McMap. All rights reserved.