What is the proper way to create an Observable which reads a stream to the end
Asked Answered
B

6

7

I'm struggling here. Normally I'd read a book but there aren't any yet. I've found countless examples of various things to do with reading streams using RX but I'm finding it very hard to get my head around.

I know I can use Observable.FromAsyncPattern to create a wrapper of the Stream's BeginRead/EndRead or BeginReadLine/EndReadLine methods.

But this only reads once -- when the first observer subscribes.

I want an Observable which will keep reading and pumping OnNext until the stream errors or ends.

In addition to this, I'd also like to know how I can then share that observable with multiple subscribers so they all get the items.

Blackness answered 22/1, 2013 at 8:58 Comment(3)
Great article, but massively out of date. That references the old API. This chapter* takes those concepts and makes it work with more recent version of Rx. *introtorx.com/Content/v1.0.10621.0/…Lithia
Ooh, never knew about that link @LeeCampbell - very nice!Voltaism
What on earth has happened to this question? Where are the earlier answers?Disquieting
B
1

The solution is to use Observable.Create

Here is an example which can be adapated for reading any kind of stream

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Don't forget to call Connect() on the returned IConnectableObservable

Blackness answered 12/2, 2013 at 21:27 Comment(0)
G
7

You can use Repeat in order to keep reading lines until the end of the stream and Publish or Replay in order to control sharing across multiple readers.

An example of a simple, full Rx solution for reading lines from any stream until the end would be:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

This solution also takes advantage of the fact that ReadLine returns null when the end of the stream is reached.

Guncotton answered 15/11, 2016 at 21:10 Comment(2)
I've tested your solution and it seems it's using several threads to read from the stream. This could have unintended solutions. Can you please check?Dogma
@Dogma FromAsync will use the default system Task scheduler, which usually goes to the ThreadPool. This could be changed by using the FromAsync overload that takes a factory method. However, using the thread pool won't change the order of lines, if that is your concern, because of the Repeat() semantics.Guncotton
C
4

Adding to Lee's answer, using rxx:

using (new FileStream(@"filename.txt", FileMode.Open)
       .ReadToEndObservable()
       .Subscribe(x => Console.WriteLine(x.Length)))
{
  Console.ReadKey();
}

The length of the read buffers will be outputted.

Cropdusting answered 22/1, 2013 at 13:37 Comment(5)
Disappeared for some reasonCropdusting
His answer was deleted because it didn't actually provide an answer to the question -- just a link to a [his] bookNosh
Is the Rxx project dead?Dogma
@Dogma looks like, unfortunately. The ReadToEndObservable source is: here and could probably be simulatedCropdusting
Thanks! is there any project similar to Rxx? I found it really interesting.Dogma
V
1

Heh - gonna reuse one of my other answers here (well, part of it, anyways):

Ref: Reading from NetworkStream corrupts the buffer

In that, I've got an extension method like this:

public static class Ext
{        
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {        
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead, 
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead, 
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

You can probably use this as written in a form like so:

// Note: ToEnumerable works here because your filestream
// has a finite length - don't do this with infinite streams!
var blobboData  = stream
     .ReadObservable(bufferSize)
     // take while we're still reading data
     .TakeWhile(returnBuffer => returnBuffer.Length > 0)
     .ToEnumerable()
     // mash them all together
     .SelectMany(buffer => buffer)
     .ToArray();
Voltaism answered 22/1, 2013 at 17:6 Comment(3)
Hello JerKimball, thank you for your solution. I've been using it, then got difficult time handling error conditions. How do you handle when you have to close the connection? I've opened question #32079792 about that if you can check it out.Slipon
Rxx seems to do it very differently. Can anyone elaborate? github.com/RxDave/Rxx/blob/master/Main/Source/…Slipon
There's a gotcha with this implementation. Different subscribers will get different shares of the stream data! One has to append a .Publish().Connect() to ensure all subscribers get all data.Slipon
B
1

The solution is to use Observable.Create

Here is an example which can be adapated for reading any kind of stream

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Don't forget to call Connect() on the returned IConnectableObservable

Blackness answered 12/2, 2013 at 21:27 Comment(0)
D
1

This is based on glope's answer. Might not be completely correct, but it's short and it's quite declarative. Moreover, it supports cancellation.

public static class StreamExtensions
{
    public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
    {
        var buffer = new byte[bufferSize];

        return Observable
        .FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct), buffer))       
        .Repeat()
        .TakeWhile(x => x.bytesRead != 0)   
        .Select(x => x.buffer)
        .SelectMany(x => x);
    }
}

BEWARE

I've tested this solution and it seems it's using several threads to read from the stream. This could have unintended side effects. I hope somebody could come with a better solution.

Dogma answered 27/2, 2023 at 15:53 Comment(0)
D
1

Out of some investigation, I've come to this code:

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    return Observable.Create<byte>(async (s, ct) =>
    {
        try
        {
            var buffer = new byte[bufferSize];
            while (await stream.ReadAsync(buffer, ct) > 0 && !ct.IsCancellationRequested)
            {
                for (var i = 0; i < bufferSize; i++)
                {
                    s.OnNext(buffer[i]);
                }
            }
            s.OnCompleted();
        }
        catch (Exception e)
        {
            s.OnError(e);
        }
    });
}

It seems to work, althought I don't like the iterative nature of it. I hope somebody with more knowledge can come with a more elegant solution.

Dogma answered 2/3, 2023 at 22:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.