.NET Asynchronous stream read/write
Asked Answered
R

6

53

I have been trying to solve this "Concurrent Programming" exam exercise (in C#):

Knowing that Stream class contains int Read(byte[] buffer, int offset, int size) and void Write(byte[] buffer, int offset, int size) methods, implement in C# the NetToFile method that copies all data received from NetworkStream net instance to the FileStream file instance. To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations. The transfer ends when the net read operation returns value 0. To simplify, it is not necessary to support controlled cancel of the operation.

void NetToFile(NetworkStream net, FileStream file);

I've been trying to solve this exercise, but I'm struggling with a question related with the question itself. But first, here is my code:

public static void NetToFile(NetworkStream net, FileStream file) {
    byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
    int offset = 0; // read/write offset
    int nBytesRead = 0; // number of bytes read on each cycle

    IAsyncResult ar;
    do {
        // read partial content of net (asynchronously)
        ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
        // wait until read is completed
        ar.AsyncWaitHandle.WaitOne();
        // get number of bytes read on each cycle
        nBytesRead = net.EndRead(ar);

        // write partial content to file (synchronously)
        fs.Write(buffer,offset,nBytesRead);
        // update offset
        offset += nBytesRead;
    }
    while( nBytesRead > 0);
}

The question I have is that, in the question statement, is said:

To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations

I'm not really sure if my solution accomplishes what is wanted in this exercise, because I'm using AsyncWaitHandle.WaitOne() to wait until the asynchronous read completes.

On the other side, I'm not really figuring out what is meant to be a "non-blocking" solution in this scenario, as the FileStream write is meant to be made synchronously... and to do that, I have to wait until NetworkStream read completes to proceed with the FileStream writing, isn't it?

Can you, please, help me out with this?


[ EDIT 1 ] Using callback solution

Ok, if I understood what Mitchel Sellers and willvv replied, I've been counseled to use a callback method to turn this into a "non-blocking" solution. Here is my code, then:

byte[] buffer; // buffer

public static void NetToFile(NetworkStream net, FileStream file) {
    // buffer with same dimension as file stream data
    buffer = new byte[file.Length];
    //start asynchronous read
    net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}

//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
    //NetworkStream retrieve
    NetworkStream net = (NetworkStream) ar.IAsyncState;
    //get number of bytes read
    int nBytesRead = net.EndRead(ar);

    //write content to file
    //... and now, how do I write to FileStream instance without
    //having its reference??
    //fs.Write(buffer,0,nBytesRead);
}

As you may have noticed, I'm stuck on the callback method, as I don't have a reference to the FileStream instance where I want to invoke the "Write(...)" method.

Additionally, this is not a thread-safe solution, as the byte[] field is exposed and may be shared among concurrent NetToFile invocations. I don't know how to solve this problem without exposing this byte[] field in the outer-scope... and I'm almost sure it may not be exposed this way.

I don't want to use a lambda or anonymous method solution, because that's not in the curriculum of "Concurrent Programing" course.

Roundtree answered 8/10, 2009 at 21:41 Comment(0)
T
13

You are going to need to use the callback from the NetStream read to handle this. And frankly it might be easier to wrap the copying logic into its own class so that you can maintain the instance of the active Streams.

This is how I'd approach it (not tested):

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}
Torrefy answered 8/10, 2009 at 22:12 Comment(2)
Thank you very much for your reply and sorry for my previous comment. I didn't analyze your reply with the attention it demands. Once I did it, I noticed you're suggesting a "blocking" and a "non-blocking" solution. Unfortunately, your "non-blocking" solutions seems a bit complex, as the exam only asks to implement the "NetToFile" method and the question values 3,5 of 20 total exam points. I really think it must be simpler to solve, without the need to create additional types or expose fields (references to streams and buffer, etc). I'm going to ask for my teacher's help on this.Roundtree
With your synchronous write - if the output stream is very slow for some reason, could you not miss receiving some of the input while waiting for the output to finish writing?Rodie
P
53

Even though it goes against the grain to help people with their homework, given that this is more than a year old, here's the proper way to accomplish this. All you need to overlap your read/write operations — no spawning of additional threads, or anything else is required.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }


    public static async Task CopyToAsync( this Stream input , Stream output )
    {
        await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
        return;
    }

    public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 } ;
        int          bufno = 0 ;
        Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
        Task         write = null ;

        while ( true )
        {

            await read ;
            bufl[bufno] = read.Result ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                await write ;
            }

            // start the new write operation
            write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            await write;
        }

        output.Flush();

        // return to the caller ;
        return;
    }

}

Cheers.

Perforce answered 9/11, 2010 at 22:38 Comment(11)
All AsyncWaitHandle.WaitOne() seem to be not needed. My tests show that async does not help, here are my results: // file -> localhost network // 1T of data (32K buffer) //async: 104 109 108 100 111 108 109 MB/s //sync: 95 98 111 124 104 117 117 MB/sZela
The call to WaitOne() is needed as there is no guarantee that the pending write operation has completed: you must wait for the asynchronous write to complete before calling EndWrite(). It doesn't surprise me that the synchronous performance is just as good as the asynchronous approach: if the i/o library is well-designed, it should have a quality buffering strategy by default and the physical i/o operations should be pretty well disconnected from the logical i/o operations.Perforce
@Nicholas WaitOne() is not needed. MSDN: "This method [EndWrite] blocks until the I/O operation has completed"Zela
This code works like a charm. There is another code piece in SO that does the same thing. A bit more readeble. Check out the answer by Thomas Levesque in this question. https://mcmap.net/q/52983/-is-stream-copy-pipedRoussillon
It's an elegant function, but the calling thread is blocked until completion. For truly non-blocking Stream copy, see e.g. #13624272Goodrich
I used Nicholas' code and converted it to use async/await. It does not block the UI and is more than fast enough for my purposes. The main thing I needed was to show transfer progress in the UI.Mulholland
how did you go about converting it to async/await, I also need to show the progress?Bren
@Webfort: I just added overloads for CopyToAsync(), completely untested, but they ought to work. Let me know if they don't. For a production use, some error/exception checking/handling and logging might not be a bad idea.Perforce
@NicholasCarey Thanks Nicholas, that worked plus I could track the progress as well.Bren
why do did you make it more complicated than it needs to be? the Read is completed synchronously, there's no need to call the async version, or manipulate all that extra state.Epexegesis
Because this is single-threaded code. The reason for making both read and write asynchronous is so they can overlap: the write operation for block n is in flight while we wait for the read of block n+1 to complete. Feel free to add your own solution.Perforce
P
22

I doubt this is the fastest code (there's some overhead from the .NET Task abstraction) but I do think it's a cleaner approach to the whole async copy thing.

I needed a CopyTransformAsync where I could pass a delegate to do something as chunks were passed through the copy operation. e.g. compute a message digest while copying. That's why I got interested in rolling my own option.

Findings:

  • CopyToAsync bufferSize is sensitive (a large buffer is required)
  • FileOptions.Asynchronous -> makes it horrendously slow (not sure exactly why that is)
  • The bufferSize of the FileStream objects can be smaller (it's not that important)
  • The Serial test is clearly the fastest and most resource intensive

Here's what I've found and the complete source code for the program I used to test this. On my machine, these tests were run on a SSD disk and is the equivalent of a file copy. Normally, you'd not want to use this for just copying files, instead when you have a network stream (which is what my use case is), that's when you'd wanna use something like this.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s

Here you can see the Process Explorer, performance graph as the test is run. Basically each top (in the lower of the three graphs) is the start of the serial test. You can clearly see how the throughput increases dramatically as the buffer size grows. It would appear as if it plans out somewhere around 80K which is what the .NET framework CopyToAsync method uses, internally.

Performance Graph

The nice thing here is that the final implementation wasn't that complicated:

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}

This method of interleaving the read/write despite the huge buffers is somewhere between 18% faster than the BCL CopyToAsync.

Out of curiosity, I did change the async calls to typical begin/end async pattern calls and that did not improve the situation one bit, it made it worse. For all I like to bash on the Task abstraction overhead, they do some nifty things when you write you code with the async/await keywords and it is much nicer to read that code!

Penstock answered 30/10, 2013 at 20:14 Comment(2)
I realize ofcourse that saying thank you is discouraged on S0. However, as a beginner to C#, your presentation of how "things should be done", or at least "can be done" and especially the test code which you have provided with your answer is instrumental to beginner programmers, as my self, to improve our skills. Honors to you sir (a point awarded in not enough!).Tiebold
@AlanWayne glad I could help. Encouragement is meaningful and I think S0 is better for it. An up vote is one way to show appreciation, a thoughtful comment like yours is another.Penstock
T
13

You are going to need to use the callback from the NetStream read to handle this. And frankly it might be easier to wrap the copying logic into its own class so that you can maintain the instance of the active Streams.

This is how I'd approach it (not tested):

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}
Torrefy answered 8/10, 2009 at 22:12 Comment(2)
Thank you very much for your reply and sorry for my previous comment. I didn't analyze your reply with the attention it demands. Once I did it, I noticed you're suggesting a "blocking" and a "non-blocking" solution. Unfortunately, your "non-blocking" solutions seems a bit complex, as the exam only asks to implement the "NetToFile" method and the question values 3,5 of 20 total exam points. I really think it must be simpler to solve, without the need to create additional types or expose fields (references to streams and buffer, etc). I'm going to ask for my teacher's help on this.Roundtree
With your synchronous write - if the output stream is very slow for some reason, could you not miss receiving some of the input while waiting for the output to finish writing?Rodie
M
11

Wow, these are all very complex! Here's my async solution, and it's just one function. The Read() and BeginWrite() both run at the same time.

/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
///  and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
    // This stream copy supports a source-read happening at the same time
    // as target-write.  A simpler implementation would be to use just
    // Write() instead of BeginWrite(), at the cost of speed.

    byte[] readbuffer = new byte[4096];
    byte[] writebuffer = new byte[4096];
    IAsyncResult asyncResult = null;

    for (; ; )
    {
        // Read data into the readbuffer.  The previous call to BeginWrite, if any,
        //  is executing in the background..
        int read = source.Read(readbuffer, 0, readbuffer.Length);

        // Ok, we have read some data and we're ready to write it, so wait here
        //  to make sure that the previous write is done before we write again.
        if (asyncResult != null)
        {
            // This should work down to ~0.01kb/sec
            asyncResult.AsyncWaitHandle.WaitOne(60000);
            target.EndWrite(asyncResult); // Last step to the 'write'.
            if (!asyncResult.IsCompleted) // Make sure the write really completed.
                throw new IOException("Stream write failed.");
        }

        if (read <= 0)
            return; // source stream says we're done - nothing else to read.

        // Swap the read and write buffers so we can write what we read, and we can
        //  use the then use the other buffer for our next read.
        byte[] tbuf = writebuffer;
        writebuffer = readbuffer;
        readbuffer = tbuf;

        // Asynchronously write the data, asyncResult.AsyncWaitHandle will
        // be set when done.
        asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
    }
}
Molar answered 29/12, 2010 at 21:57 Comment(1)
dont you mean "target.EndWrite(asyncResult);"?Altorilievo
S
9

It's strange that no one mentioned TPL.
Here's very nice post by PFX team (Stephen Toub) about how to implement concurrent async stream copy. The post contains out-dated refenrece to samples so here's corrent one:
Get Parallel Extensions Extras from code.msdn then

var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();

Also consider using J.Richer's AsyncEnumerator.

Scholiast answered 11/7, 2011 at 16:51 Comment(1)
Considering this reads like a homework assignment, I doubt third party libraries are allowed :DWandy
N
0

You're right, what you're doing is basically synchronous reading, because you use the WaitOne() method and it just stops the execution until the data is ready, that's basically the same as doing it using Read() instead of BeginRead() and EndRead().

What you have to do, is use the callback argument in the BeginRead() method, with it, you define a callback method (or a lambda expression), this method will be invoked when the information has been read (in the callback method you have to check for the end of the stream, and write to the output stream), this way you won't be blocking the main thread (you won't need the WaitOne() nor the EndRead().

Hope this helps.

Nummulite answered 8/10, 2009 at 21:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.