Reading from Stream using Observable through FromAsyncPattern, how to close/cancel properly
Asked Answered
T

0

2

Need: long-running program with TCP connections

A C# 4.0 (VS1010, XP) program needs to connect to a host using TCP, send and receive bytes, sometimes close the connection properly and reopen it later. Surrounding code is written using Rx.Net Observable style. The volume of data is low but the program should runs continuously (avoid memory leak by taking care of properly disposing resources).

The text below is long because I explain what I searched and found. It now appears to work.

Overall questions are: since Rx is sometime unintuitive, are the solutions good? Will that be reliable (say, may it run for years without trouble)?

Solution so far

Send

The program obtains a NetworkStream like this:

TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;

tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();

Asynchronous sending is easy enough. Rx.Net allows to handle this with much shorter and cleaner code than traditional solutions. I created a dedicated thread with an EventLoopScheduler. The operations needing a send are expressed using IObservable. Using ObserveOn(sendRecvThreadScheduler) guarantee that all send operations are done on that thread.

sendRecvThreadScheduler = new EventLoopScheduler(
    ts =>
    {
        var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
        return thread;
    });
        // Loop code for sending not shown (too long and off-topic).

So far this is excellent and flawless.

Receive

It seems that to receive data, Rx.Net should also allow shorter and cleaner code that traditional solutions. After reading several resources (e.g. http://www.introtorx.com/ ) and stackoverflow, it seems that a very simple solution is to bridge the Asynchronous Programming to Rx.Net like in https://mcmap.net/q/1427816/-what-is-the-proper-way-to-create-an-observable-which-reads-a-stream-to-the-end :

public static class Ext
{
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead,
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

It mostly works. I can send and receive bytes.

Close time

This is when things start to go wrong.

Sometimes I need to close the stream and keep things clean. Basically this means: stop reading, end the byte-receiving observable, open a new connection with a new one.

For one thing, when connection is forcibly closed by remote host, BeginRead()/EndRead() immediately loop consuming all CPU returning zero bytes. I let higher level code notice this (with a Subscribe() to the ReadObservable in a context where high-level elements are available) and cleanup (including closing and disposing of the stream). This works well, too, and I take care of disposing of the object returned by Subscribe().

    someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
    {
        if (buf.Length == 0)
        {
                MyLoggerLog("Read explicitly returned zero bytes.  Closing stream.");
                this.pscDestroyIfAny();
        }
    });

Sometimes, I just need to close the stream. But apparently this must cause exceptions to be thrown in the asynchronous read. c# - Proper way to prematurely abort BeginRead and BeginWrite? - Stack Overflow

I added a CancellationToken that causes Observable.While() to end the sequence. This does not help much to avoid these exceptions since BeginRead() can sleep for a long time.

Unhandled exception in the observable caused the program to exit. Searching provided .net - Continue using subscription after exception - Stack Overflow which suggested to add a Catch that resumes the broken Observable with an empty one, effectively.

Code looks like this:

public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
    // to hold read data
    var buffer = new byte[bufferSize];
    // Step 1: async signature => observable factory
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead,
        stream.EndRead);
    return Observable.While(
        // while there is data to be read
        () =>
        {
            return (!token.IsCancellationRequested) && stream.CanRead;
        },
        // iteratively invoke the observable factory, which will
        // "recreate" it such that it will start from the current
        // stream position - hence "0" for offset
        Observable.Defer(() =>
            {
                if ((!token.IsCancellationRequested) && stream.CanRead)
                {
                    return asyncRead(buffer, 0, bufferSize);
                }
                else
                {
                    return Observable.Empty<int>();
                }
            })
            .Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
        .Select(readBytes => buffer.Take(readBytes).ToArray()));
}

What now? Question

This appears to work well. Conditions where remote host forcibly closed the connection or is just no longer reachable are detected, causing higher level code to close the connection and retry. So far so good.

I'm unsure if things feel quite right.

For one thing, that line:

.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

feels like the bad practice of empty catch block in imperative code. Actual code does log the exception, and higher level code detect the absence of reply and correctly handle, so it should be considered fairly okay (see below)?

.Catch((Func<Exception, IObservable<int>>)(ex =>
{
    MyLoggerLogException("On asynchronous read from network.", ex);
    return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

Also, this is indeed shorter than most traditional solutions.

Are the solutions correct or did I miss some simpler/cleaner ways?

Are there some dreadful problems that would look obvious to wizards of Reactive Extensions?

Thank you for your attention.

Thereon answered 18/8, 2015 at 17:48 Comment(3)
Are you open to throwing Rx away and using traditional await-based loops for processing data?Naquin
Thank you. async/await are from VS2012/C#4.5, so short answer is "no". Yet can you elaborate? async/await makes locally concurrent code that resembles synchronous code which is double-edged. Could something like this be expanded to handling "accept new commands any time from any thread+send+guarantee delay between sends+receive+detect losses+retry+autoreconnect" logic simply? The current Observable + event loop approach has a learning curve but looks explicit, works. Any pitfall? Thanks.Spectrohelioscope
You can use await with .NET 4.0 and VS12. You can use "fake-await" based on iterators with 4.0 and 2010. Things like "guarantee delay between sends" indeed sound a little like Rx but some simple DateTime based logic should be able to do that. Retry and reconnect are not a problem that await would have difficulties with. Semantically, await does nothing. It simply untangles the callback mess and puts it into a readable form where you can use the usual control flow constructs.Naquin

© 2022 - 2024 — McMap. All rights reserved.