Need: long-running program with TCP connections
A C# 4.0 (VS1010, XP) program needs to connect to a host using TCP, send and receive bytes, sometimes close the connection properly and reopen it later. Surrounding code is written using Rx.Net Observable
style. The volume of data is low but the program should runs continuously (avoid memory leak by taking care of properly disposing resources).
The text below is long because I explain what I searched and found. It now appears to work.
Overall questions are: since Rx is sometime unintuitive, are the solutions good? Will that be reliable (say, may it run for years without trouble)?
Solution so far
Send
The program obtains a NetworkStream
like this:
TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;
tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();
Asynchronous sending is easy enough. Rx.Net allows to handle this with much shorter and cleaner code than traditional solutions. I created a dedicated thread with an EventLoopScheduler
. The operations needing a send are expressed using IObservable
. Using ObserveOn(sendRecvThreadScheduler)
guarantee that all send operations are done on that thread.
sendRecvThreadScheduler = new EventLoopScheduler(
ts =>
{
var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
return thread;
});
// Loop code for sending not shown (too long and off-topic).
So far this is excellent and flawless.
Receive
It seems that to receive data, Rx.Net should also allow shorter and cleaner code that traditional solutions. After reading several resources (e.g. http://www.introtorx.com/ ) and stackoverflow, it seems that a very simple solution is to bridge the Asynchronous Programming to Rx.Net like in https://mcmap.net/q/1427816/-what-is-the-proper-way-to-create-an-observable-which-reads-a-stream-to-the-end :
public static class Ext
{
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() => stream.CanRead,
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
}
It mostly works. I can send and receive bytes.
Close time
This is when things start to go wrong.
Sometimes I need to close the stream and keep things clean. Basically this means: stop reading, end the byte-receiving observable, open a new connection with a new one.
For one thing, when connection is forcibly closed by remote host, BeginRead()/EndRead()
immediately loop consuming all CPU returning zero bytes. I let higher level code notice this (with a Subscribe()
to the ReadObservable
in a context where high-level elements are available) and cleanup (including closing and disposing of the stream). This works well, too, and I take care of disposing of the object returned by Subscribe()
.
someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
{
if (buf.Length == 0)
{
MyLoggerLog("Read explicitly returned zero bytes. Closing stream.");
this.pscDestroyIfAny();
}
});
Sometimes, I just need to close the stream. But apparently this must cause exceptions to be thrown in the asynchronous read. c# - Proper way to prematurely abort BeginRead and BeginWrite? - Stack Overflow
I added a CancellationToken
that causes Observable.While()
to end the sequence. This does not help much to avoid these exceptions since BeginRead()
can sleep for a long time.
Unhandled exception in the observable caused the program to exit. Searching provided .net - Continue using subscription after exception - Stack Overflow which suggested to add a Catch that resumes the broken Observable
with an empty one, effectively.
Code looks like this:
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() =>
{
return (!token.IsCancellationRequested) && stream.CanRead;
},
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() =>
{
if ((!token.IsCancellationRequested) && stream.CanRead)
{
return asyncRead(buffer, 0, bufferSize);
}
else
{
return Observable.Empty<int>();
}
})
.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
What now? Question
This appears to work well. Conditions where remote host forcibly closed the connection or is just no longer reachable are detected, causing higher level code to close the connection and retry. So far so good.
I'm unsure if things feel quite right.
For one thing, that line:
.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
feels like the bad practice of empty catch block in imperative code. Actual code does log the exception, and higher level code detect the absence of reply and correctly handle, so it should be considered fairly okay (see below)?
.Catch((Func<Exception, IObservable<int>>)(ex =>
{
MyLoggerLogException("On asynchronous read from network.", ex);
return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
Also, this is indeed shorter than most traditional solutions.
Are the solutions correct or did I miss some simpler/cleaner ways?
Are there some dreadful problems that would look obvious to wizards of Reactive Extensions?
Thank you for your attention.