How can I have an async stream return with 2 data sources
Asked Answered
P

3

2

I have the following function that returns the standard output data, as an async stream, that results from running a System.Diagnostics.Process. Everything currently in the method works as intended; I can call it in an await foreach() loop and I get each line of output as its generated by the external exe.

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

My problem is that now I need it to return both the standard output and standard error results. I made this class to hold the data from each stream.

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

and changed ProcessAsyncStream() to look like this

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    ProcessStartInfo processStartInfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processStartInfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s, e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginOutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(), 
         Output = outputDataBuffer.Receive()
      }
}

The problem is that if either buffer completes before the other than the method hangs up because that buffer's .Receive() doesn't have any data to receive. If I change the while condition to && then I won't get all the data from the other buffer.

Any suggestions?

Proteus answered 4/11, 2020 at 6:12 Comment(13)
Would Error = errorDataBuffer.OutputAvailableAsync() ? errorDataBuffer.Receive() : null (and similarly for Output) work for you?Ocotillo
You are checking the outputDataBuffer.OutputAvailableAsync() twice in the last while loop. Is this intentional or a bug?Bulgar
This is a strange solution you have here. Also shouldn't you be using TryReceiveRibbon
@TheodorZoulias oops, that was just a mistype. I did some format editing in the SO editor and am half asleep and made a mistake. I am not doing that in my real code.Proteus
@KlausGütter Yes, I believe that would work. I can't believe I didn't think of that. That's what I get for programming while sleep deprived. ThanksProteus
@Ribbon Why is it a strange solution? And yes, I probably should be.Proteus
OK, no problem. Could you fix this mistype? I am not sure how to fix it myself. Btw the pattern you use for converting a BufferBlock to an IAsyncEnumerable is not optimal, and is not safe for multiple consumers. Look here for the correct pattern (the ToAsyncEnumerable extension method).Bulgar
Regarding the condition of the stream completion, do you want it to complete when both events (OutputDataReceived and ErrorDataReceived) have been triggered with e.Data is null? Alternatively you could complete the stream on the Exited event.Bulgar
Yes. I want both e.Data's to be null. I want all the data that this process has to offer.Proteus
@TheodorZoulias So just by moving the while to an extension method its all of the sudden safe for multiple consumers? I don't understand this.Proteus
master_ruko no, what makes it safe is the use of the TryReceive method. In a multiple-consumers scenario it is possible to get an InvalidOperationException if you call Receive after awaiting the OutputAvailableAsync. Also by calling the TryReceive in a while loop you may get better performance in high throughput scenarios, because the OutputAvailableAsync is relatively expensive.Bulgar
@TheodorZoulias Ah, I see. I was unaware of this, as I haven't had that problem yet.Proteus
As a side note, in case you are interested about performance, the Channels are considered superior to the BufferBlock. They use ValueTasks internally, and as a result they are less allocatey. Also propagating a struct ProcessData instead of a class could be beneficial too.Bulgar
R
3

Regarding the actual problem, there is an issue with the process flow of reading the blocks. The easiest solution is to just use a single buffer with multiple producers and a single consumer combined with a message packet.

The conceptual issue that you are trying to solve with the DataFlow blocks is in the fundamental nature of events an async streams. Events are pushed, and async streams are pulled.

There are several solutions that would map them together, though I think the most elegant would be just to use an Unbounded Channel as the buffer.

Channels are more modern approach than DataFlow, have less degrees of freedom, less clunky then a BufferBlock, and very lightweight and highly optimized. Additionally, I would just pass a wrapper for the different response types.

Disregarding any other problem (conceptual or otherwise).

Given

public enum MessageType
{
   Output,
   Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data, MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

Usage

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo, 
     CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data, MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data, type), cancellationToken);
   }

   process.OutputDataReceived += (_, args) => OnReceived(args.Data, MessageType.Output);
   process.ErrorDataReceived += (_, args) => OnReceived(args.Data, MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

Note : completely untested

Ribbon answered 4/11, 2020 at 8:5 Comment(4)
Can I set the UnboundedChannelOptions.SingleReader option to true in this case? I was reading up on channels, as this is a new thing for me, and read that setting these options can help to optimize performance in some cases. I'm going to try it, cuz I think its appropriate. I'm just not sure if there's something I'm not aware of that might cause any issues.Proteus
Also, I've never used a cancelation token. Should I register a delegate to stop the process if the cancelation token is activated?Proteus
@Proteus Yes single reader is indeed appropriate hereRibbon
@Proteus the token with throw in the channel methods. If you need to clean up on token cancellation, you can either catch the exception Operation Cancelled. Or register with the the token itself. I think in this circumstance id go with catch or maybe even a finally to do your clean up and let the exception propagateRibbon
T
1

Complete on exit instead.

void HandleData(object sender, DataReceivedEventArgs e)
{
    if (e.Data != null) dataBuffer.Post(e.Data);
}

process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) => 
{
    process.WaitForExit();
    dataBuffer.Complete();
};
Tomikotomkiel answered 4/11, 2020 at 8:57 Comment(13)
While I'm sure that Process always raises events in the expected order, I see no reason to expect the Windows thread scheduler to guarantee that the Exited event is always raised after the last HandleData() call has been made. These events are raised asynchronously in the thread pool, and not synchronized with each other, so it's always possible that there will be another call to HandleData() after the Exited event is raised.Formerly
In other words, I see why this approach would be tempting, but it seems like one of the worst options (not even counting that the OP keeps the stdout and stderr data streams separated, while the above does not provide for that...that seems relatively simple to fix though).Formerly
I did an experiment and verified Peter Duniho's comment. The Exited is indeed invoked before all handlers of the OutputDataReceived and ErrorDataReceived events have completed. The documentation indicates that calling the WaitForExit method should be sufficient to ensure that all handlers have completed, and my experiment confirmed that. So an easy fix could be to call the WaitForExit before completing the buffer (inside the Exited event handler).Bulgar
@TheodorZoulias Thanks. Revised the code per your suggestion.Tomikotomkiel
Unfortunately, the suggestion from @TheodorZoulias does not work. The documentation hints at this, though is not 100% explicit: "ensures that all processing has been completed, including the handling of asynchronous events". The WaitForExit() method waits for all event handlers to return, including the Exited handler. If you call it from that handler (or any other), the process will deadlock.Formerly
@PeterDuniho good point. While doing my tests I didn't experience any deadlocks, which indicates that the handlers of the Exited event may not affect the behavior of the WaitForExit. But it should be easy to remove this risk factor, by offloading the WaitForExit()+buffer.Complete() to another thread (with Task.Run). This way the Exited handler will not be blocked.Bulgar
@TheodorZoulias: well, I did test it, and having a call for WaitForExit() in the Exited handler causes deadlock, at least on .NET Framework. Yes, pushing that off to the thread pool can solve the deadlock issue, but it further complicates the solution. The primary appeal of the code above is its simplicity; the channels approach is IMHO better overall, and once the above has been modified to solve all the problems (e.g. having to call WaitForExit(), in a task, and fixing the handlers so that out and err are distinguished), you get something not dissimilar to the channels approach.Formerly
@PeterDuniho my tests were on the .NET Core 3.1.3 (no deadlock). I just tested on .NET Framework 4.8.3801, and didn't observe a deadlock either. It does hate calling WaitForExit twice from different threads though. One of the calls throws a random exception. Indeed this complication reduces the appeal of this solution. But choosing a Channel over a BufferBlock makes no difference regarding the main issue, which is finding the correct moment to mark the completion of the async queue. Depending on receiving null e.Data from the output/error handlers is not terribly appealing either.Bulgar
@TheodorZoulias: your concern about the null values is unfounded. That's well-defined behavior. The event handlers are never going to be called more than once with null. The null specifically marks the end of the stream, and thus the last time the handler will be called.Formerly
@PeterDuniho this behavior is well-defined experimentally, but is it documented? I haven't seen it described in the documentation so far.Bulgar
@TheodorZoulias: The doc is not explicit, granted. But that doesn't mean that that's not what the documentation says. The documentation says that "When the redirected stream is closed, a null line is sent to the event handler". It doesn't say that null is sent any other time. It's only sent when the stream is closed. How many times do you think the stream is closed? It would be pretty silly to expect the documentation to always call out all the situations when something doesn't happen.Formerly
@PeterDuniho OK, agreed. This excerpt from the documentation is convincing enough for me. :-)Bulgar
@PeterDuniho to be fair though, having to count the nulls coming from the two events in order to signal the completion of the combined stream (like the General does in their answer) looks a bit sketchy. It makes me thing that the Process class lacks a proper mechanism for awaiting its completion, something like the WaitForExit but asynchronous. The Exited event, although advertised as such, is not equivalent because it is triggered prematurely.Bulgar
B
0

You could use a single buffer of ProcessData items:

var buffer = new BufferBlock<ProcessData>();

Then use a custom Complete mechanism to complete the buffer when both events have propagated a null value:

process.OutputDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(1);
        else buffer.Post(new ProcessData() { Output = e.Data });
};

process.ErrorDataReceived += (s, e) =>
{
    if (e.Data is null) Complete(2);
        else buffer.Post(new ProcessData() { Error = e.Data });
};

Here is an implementation of the Complete method:

bool[] completeState = new bool[2];
void Complete(int index)
{
    bool completed;
    lock (completeState.SyncRoot)
    {
        completeState[index - 1] = true;
        completed = completeState.All(v => v);
    }
    if (completed) buffer.Complete();
}
Bulgar answered 4/11, 2020 at 7:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.