Consuming a custom stream (IEnumerable<T>)
Asked Answered
O

3

16

I'm using a custom implementation of a Stream that will stream an IEnumerable<T> into a stream. I'm using this EnumerableStream implementation to perform the conversion.

I'm using it to perform streaming over WCF in streaming mode. I'm able to convert the IEnumerable to a stream without problem. Once, I'm in the client side, I can deserialize and get all the data, however I'm not able to find the condition to stop looping over my stream. I'm getting:

System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.

Here's sample example of what I'm trying to achieve:

class Program
{
    public static void Main()
    {
        var ListToSend = new List<List<string>>();
        var ListToReceive = new List<List<string>>();
        ListToSend = SimulateData().ToList();
        using (Stream stream = GetStream(ListToSend))
        {
            var formatter = new BinaryFormatter();
            while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
            {
                List<string> row = formatter.Deserialize(stream) as List<string>;
                ListToReceive.Add(row);
            }
            Printer(ListToReceive);
            Console.WriteLine("Done");
        }
    }

    private static void Printer(List<List<string>> data)
    {
        Console.WriteLine("Printing");
        foreach (var row in data)
        {
            foreach (var cell in row)
            {
                Console.Write(cell + "\t");
            }
            Console.WriteLine("-------------------------------------------------------------------------------");
        }
    }
    private static Stream GetStream(IEnumerable<List<string>> data)
    {
        return EnumerableStream.Create(data, DeserializerCallback);
    }

    private static List<byte> DeserializerCallback(object obj)
    {
        var binFormatter = new BinaryFormatter();
        var mStream = new MemoryStream();
        binFormatter.Serialize(mStream, obj);
        return mStream.ToArray().ToList();
    }

    private static IEnumerable<List<string>> SimulateData()
    {
        Random randomizer = new Random();
        for (var i = 0; i < 10; i++)
        {
            var row = new List<string>();
            for (var j = 0; j < 1000; j++)
            {
                row.Add((randomizer.Next(100)).ToString());
            }
            yield return row;
        }
    }
}

I did not include the custom stream. I created a fiddle for those that want to see the entire code.

  • Do I need to add something in the custom stream itself to notify that all the data have been read?
  • Is it because the format of the deserializer and serialiser are not the same (I don't think so).
  • I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
  • I would prefer not to wrap the code with a try and catch, I want a clean solution that does not crash.
Overcash answered 23/8, 2018 at 19:51 Comment(7)
Just because Exceptions are not nice, when working with external code or libraries (like that BinaryFormatter), you cannot always avoid them nor does it always make sense to avoid them in the first place. But in the end im on your side with this. However it would have been nice to see your intentions before you complain on that answer or downvote it.Maurreen
The first thing binary formatter does, is to read a header record. In that process, it reads 17 bytes from the stream, and when the resulting data has a length of less than 17 bytes, it throws an end of stream exception. So, as soon as you ask the BinaryFormatter you will have an exception. A "proper" solution, and not uncommon when you design any protocolls, would be to send the number of items in the stream before the first item, and not reading more than that.Maurreen
Found a proper solution. No Try. No Catch. See my (updated) old answer.Maurreen
Just a FYI BinaryFormatter is not recommended for use for inter machine communication or for persisted data. It is extremely intolerant of assembly version changes, if one side has updates the other does not they won't be able to talk to each other.Verdieverdigris
@ScottChamberlain: What do you suggest to use? protobuf-net?Overcash
@Overcash That is what I usually use.Verdieverdigris
How about doing it the classic way and just write some serializer/deserializer for json or so?Maurreen
C
8

Do I need to add something in the custom stream itself to notify that all the data have been read?

You can, but that wouldn't help in the WCF scenario where the received Stream is a different class.

There are two standard (official, by design) ways of determining the end of the Stream data:

(1) ReadByte returning -1

Returns

The unsigned byte cast to an Int32, or -1 if at the end of the stream.

(2) Read returning 0 when called with count > 0

Returns

The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.

Unfortunately both they consume the current byte (advance to next) and will break the deserializer.

What are the possible solutions?

First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T> stores Count before elements, T[] stores Length before elements etc. Since the EnumerableStream<T> does not know the count in advance, one simple solution would be to emit a single fake byte before each element:

private bool SerializeNext()
{
    if (!_source.MoveNext())
        return false;

    buf.Enqueue(1); // <--
    foreach (var b in _serializer(_source.Current))
        _buf.Enqueue(b);

    return true;
}

This would allow you to use

while (stream.ReadByte() != -1)
{
    // ...
}

Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte method with the same semantics as the standard ReadByte, but without consuming the current byte:

public class SequentialStream : Stream
{
    private Stream source;
    private bool leaveOpen;
    private int? nextByte;

    public SequentialStream(Stream source, bool leaveOpen = false)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
        this.source = source;
        this.leaveOpen = leaveOpen;
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !leaveOpen)
            source.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override void Flush() { }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public int PeekByte()
    {
        if (nextByte == null)
            nextByte = source.ReadByte();
        return nextByte.Value;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return 0;
        if (nextByte != null)
        {
            if (nextByte.Value < 0) return 0;
            buffer[offset] = (byte)nextByte.Value;
            if (count > 1)
            {
                int read = source.Read(buffer, offset + 1, count - 1);
                if (read == 0)
                    nextByte = -1;
                else
                    nextByte = null;
                return read + 1;
            }
            else
            {
                nextByte = null;
                return 1;
            }
        }
        else
        {
            int read = source.Read(buffer, offset, count);
            if (read == 0)
                nextByte = -1;
            return read;
        }
    }
} 

This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.

The usage will be like this:

using (var stream = new SequentialStream(GetStream(ListToSend)))
{
    // ...
    while (stream.PeekByte() != -1) 
    {
        // ...
    }
    // ...
}

P.S. What about

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

It's not randomly. BinaryFormatter internally uses BinaryReader to read typed values like Int32, Byte, String etc., passing the desired size as count, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.

Cunningham answered 29/8, 2018 at 17:54 Comment(12)
Do you think that this will work with my WCF consumer? I did not take the time to see if it is working. I will give it a try later. Basically, I have something like that using (var stream = Channel.GetDataAsStream(request.Provider, request.Query)) {... in my client. The stream returned by WCF is a System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Overcash
The second approach for sure. Just wrap the stream that you get from WCF, e.g. using (var stream = new SequentialStream(Channel.GetDataAsStream(request.Provider, request.Query))) { ... }Cunningham
There something that I don't understand. Should SequentialStream be used on both the server and the client? Or else, I need to first stream the IEnumerableStream from the server then it needs to go through WCF and then we get a MessageBodyStream in the client side. After that in the client, I need to wrap the WCF stream like you suggested with the SequentialStream?Overcash
Correct. It's used to wrap any stream which you want to deserialize with check for end of stream like in your sample code. Which means that the actual usage in your case will be on the client side. The WCF infrastructure already correctly handles your EnumerableStream<T> at the server side, so no modification is needed there.Cunningham
Thank you! I will give it a try now and get back to you.Overcash
It's working great! However, do you think that it's adding some overhead on the performance?Overcash
It's just a wrapper, simply delegating most of the Read calls to the underlying stream, so the impact should be negligible. The only additional (but little) overhead is when you call PeekByte, but that happens only once per each enumerable element, and again should be negligible compared to overall deserialization process.Cunningham
Thank you for your help, I will give you the bounty points in 16 hours :)Overcash
You are welcome, glad it helped :) I would suggest you to keep the bounty opened until the end of the bounty period. This way your question can get more UV (and more answers - even better, who knows). Cheers.Cunningham
I will surly create another question that may be related to this one. It's related to wcf. The streaming in WCF is way more slower than buffering data.Overcash
Good, I will follow your advice. I will keep it open until the end.Overcash
I linked the current question/answer to that question. It's related to WCF streaming. Here's the link: https://mcmap.net/q/751146/-wcf-streaming-mode-is-really-slow/1504370Overcash
D
2

First off, you can simply serialize List<List<string>> itself. Demo here. This eliminates the need for this specialized class to read the stream. And potentially renders this answer moot. The only purpose for streaming it one at a time would be a potentially very large dataset. A different implementation would be needed in this case, which is what this following solution could potentially address.

The following answer (and your code) requires that the client reading the stream has the EnumerableStream class.

Do I need to add something in the custom stream itself to notify that all the data have been read?

Yes. You need to implement a new property to know if you have another T to read, or use Length.

public bool HasMore { get { return _buf.Any() || SerializeNext();} }

or

public override long Length { get { return (_buf.Any() || SerializeNext()) ? 1 : 0; } }

I feel like this whole solution can be cleaned up to have an IEnumerable<T> StreamReader. However, this works.

Here is the adjusted and working fiddler. Note that I also cleaned it up a bit. The static class named the same as the other class was causing me a headache ;). Also, I would change to byte[], not List<byte>.

Is it because the format of the deserializer and serialiser are not the same (I don't think so).

No.

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

The buffer _buf should be the size of the current item serialized. This could vary per item.

I would prefer not to wrap the code with a try and catch, I want a clean solution that does not crash.

You are wise to not take the approach of just swallowing the exception, but instead understanding how to make it work as intended.

Dainedainty answered 29/8, 2018 at 13:48 Comment(14)
The documentation of Stream.CanRead indicates a different meaning: When overridden in a derived class, gets a value indicating whether the current stream supports reading. it would be wise to move that detection somewhere else.Maurreen
@TheSoftwareJedi: the link is not working. And btw, I thought off using the CanRead method, however it does not work like you described.The property indicates only if a custom stream does support the Read function. It's a readonly property. learn.microsoft.com/en-us/dotnet/api/…Overcash
@TheSoftwareJedi: I see that it's working well! However, do you think that it's good idea to use that property (CanRead) like this. The purpose of that property is not meant to be used to stop reading. What do you think?Overcash
Yeah, in hindsight it is a bit of a hack looking at the docs. Add another property such as "HasMore", and use that? I'm still playing with it here.Dainedainty
@TheSoftwareJedi: The problem is that I'm using it between a client and server. The server streams the data, and then the client consumes it. The client does not know the type of that custom Stream. The method HasMore will not be available to the client. He only knows that it's a Stream.Overcash
@Overcash See update, and I updated the link as well.Dainedainty
@Overcash The client will need to have this EnumerableStream<T> class available to them though. Without the binary, it won't work. So they could cast it, or you can hijack Length.Dainedainty
@TheSoftwareJedi: I know I already did that solution in the past, but unfortunately my requirements do not allow me to add the class on the client side...Overcash
@TheSoftwareJedi: I don't know if you have some experience with WCF steaming. Basically, I created that sample to demonstrate what I'm trying to achieve with the WCF streaming. The client will receive a System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream. I need maybe to create another question.Overcash
@TheSoftwareJedi: If I apply the stop condition on the Length or on CanRead, WCF seems to ignore it.Overcash
@Overcash The client isn't using this new Stream class, so yes, what you are trying to do will not work unless the client uses the new Stream class.Dainedainty
@Overcash you need to simply serialize the List<List<T>> and deserialize the List<List<T>>. Then this will work with no additional code.Dainedainty
@Overcash Added a first sentence onto this answer. It demos simply serializing List<List<string>> as a whole. dotnetfiddle.net/TIqwVxDainedainty
@TheSoftwareJedi: Thank you a lot for your help and your time, I was able to fix my issue with the help of Ivan Stoev.Overcash
M
1

implement the length property:

public override long Length 
{
    get 
    {
        return (_buf.Any() || SerializeNext()) ? 1 : 0;
    } 
}

then check the length:

        while (stream.Length > 0)
        {
            List<string> row = formatter.Deserialize(stream) as List<string>;
            ListToReceive.Add(row);
        }

i have tested this in your fiddle, and it works well.

It is a very similar approach to @TheSoftwareJedi s solution, but makes use of the Length property, which in this case will return the length of elments you "know" are in the stream. As far as i can see this is not against the intendet use of this property.

Maurreen answered 29/8, 2018 at 13:31 Comment(6)
People already posted answers like this and have been deleted, because it does not answer the question. This is what I do for the moment to bypass the error. I want a solution that does not crash.Overcash
@Overcash if you're the one flagging them, you may want to leave at least one with a comment on why it does not address your need. (Seen from review, so I may not have the context.)Kuenlun
@JoshuaDrake: It does not give me any textbox to enter any comments when I want to flag. I chose the option Not an answer flag.Overcash
@Overcash please see: What To FlagKuenlun
have replaced my answer with a completly new solution.Maurreen
@Chris: Your solution works well in the example that I gave. However, in my concrete scenario, it will not work, because I'm using WCF steaming. WCF gives me a System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream. This class does not implement the Length property.Overcash

© 2022 - 2024 — McMap. All rights reserved.