I have the following function that returns the standard output data, as an async stream, that results from running a System.Diagnostics.Process
. Everything currently in the method works as intended; I can call it in an await foreach()
loop and I get each line of output as its generated by the external exe.
private static async IAsyncEnumerable<string> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handler back to this method
BufferBlock<string> dataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
dataBuffer.Complete();
}
else
{
dataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await dataBuffer.OutputAvailableAsync())
yield return dataBuffer.Receive();
}
My problem is that now I need it to return both the standard output and standard error results. I made this class to hold the data from each stream.
public class ProcessData
{
public string Error { get; set; } = "";
public string Output { get; set; } = "";
}
and changed ProcessAsyncStream()
to look like this
private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
ProcessStartInfo processStartInfo)
{
// Ensure that process is destroyed when this method exits
using var process = new Process() { StartInfo = processStartInfo };
// Buffer used to pass data from event-handlers back to this method
BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
BufferBlock<string> errorDataBuffer = new BufferBlock<string>();
process.OutputDataReceived += (s, e) =>
{
if (e.Data is null)
{
outputDataBuffer.Complete();
}
else
{
outputDataBuffer.Post(e.Data);
}
};
process.ErrorDataReceived += (s, e) =>
{
if (e.Data is null)
{
errorDataBuffer.Complete();
}
else
{
errorDataBuffer.Post(e.Data);
}
};
// Start process and redirect output streams
process.Start();
process.BeginOutputReadLine();
// Return data line by line
while (await outputDataBuffer.OutputAvailableAsync()
|| await errorDataBuffer.OutputAvailableAsync())
yield return new ProcessData()
{
Error = errorDataBuffer.Receive(),
Output = outputDataBuffer.Receive()
}
}
The problem is that if either buffer completes before the other than the method hangs up because that buffer's .Receive()
doesn't have any data to receive. If I change the while
condition to &&
then I won't get all the data from the other buffer.
Any suggestions?
Error = errorDataBuffer.OutputAvailableAsync() ? errorDataBuffer.Receive() : null
(and similarly for Output) work for you? – OcotillooutputDataBuffer.OutputAvailableAsync()
twice in the lastwhile
loop. Is this intentional or a bug? – BulgarTryReceive
– RibbonBufferBlock
to anIAsyncEnumerable
is not optimal, and is not safe for multiple consumers. Look here for the correct pattern (theToAsyncEnumerable
extension method). – BulgarOutputDataReceived
andErrorDataReceived
) have been triggered withe.Data is null
? Alternatively you could complete the stream on theExited
event. – Bulgare.Data
's to be null. I want all the data that this process has to offer. – Proteuswhile
to an extension method its all of the sudden safe for multiple consumers? I don't understand this. – ProteusTryReceive
method. In a multiple-consumers scenario it is possible to get anInvalidOperationException
if you callReceive
after awaiting theOutputAvailableAsync
. Also by calling theTryReceive
in awhile
loop you may get better performance in high throughput scenarios, because theOutputAvailableAsync
is relatively expensive. – BulgarBufferBlock
. They useValueTask
s internally, and as a result they are less allocatey. Also propagating astruct ProcessData
instead of aclass
could be beneficial too. – Bulgar