IEnumerable to Stream
Asked Answered
T

6

26

I would like to do something roughly equivalent to the code example below. I want to generate and serve a stream of data without necessarily having the entire data set in memory at any one time.

It seems like I would need some implementation of Stream that accepts an IEnumerable<string> (or IEnumerable<byte>) in its constructor. Internally this Stream would only walk the IEnumerable as the Stream is being read or as needed. But I don't know of any Stream implementation like this.

Am I on the right track? Do you know of any way to do something like this?

    public FileStreamResult GetResult()
    {
        IEnumerable<string> data = GetDataForStream();

        Stream dataStream = ToStringStream(Encoding.UTF8, data);

        return File(dataStream, "text/plain", "Result");
    }

    private IEnumerable<string> GetDataForStream()
    {
        StringBuilder sb;
        for (int i = 0; i < 10000; i++)
        {
            yield return i.ToString();
            yield return "\r\n";
        }
    }

    private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
    {
        // I have to write my own implementation of stream?
        throw new NotImplementedException();
    }
Thackeray answered 26/2, 2014 at 16:49 Comment(5)
Can you guarantee that the stream will be read sequentially in single pass?Beneficent
@Beneficent You can just set CanSeek to false if you can't support seeking, and then simply throw if Seek is called.Eponymous
@Beneficent - I think I can guarantee that. My intent is to give the stream to a MVC FileStreamResult, as in my example.Thackeray
Looking into this further, this may just be a bad idea. Can this work without me implementing the Stream Length property?Thackeray
@Thackeray That depends entirely on whether or not the reader relies on it or not.Eponymous
M
10

Here's a read-only Stream implementation that uses an IEnumerable<byte> as input:

public class ByteStream : Stream, IDisposable
{
    private readonly IEnumerator<byte> _input;
    private bool _disposed;

    public ByteStream(IEnumerable<byte> input)
    {
        _input = input.GetEnumerator();
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => 0;
    public override long Position { get; set; } = 0;

    public override int Read(byte[] buffer, int offset, int count)
    {
        int i = 0;
        for (; i < count && _input.MoveNext(); i++)
            buffer[i + offset] = _input.Current;
        return i;
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();
    public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override void Flush() => throw new InvalidOperationException();

    void IDisposable.Dispose()
    {
        if (_disposed)
            return;
        _disposed = true;
        _input.Dispose();
    }
}

What you then still need is a function that converts IEnumerable<string> to IEnumerable<byte>:

public static IEnumerable<byte> Encode(IEnumerable<string> input, Encoding encoding)
{
    byte[] newLine = encoding.GetBytes(Environment.NewLine);
    foreach (string line in input)
    {
        byte[] bytes = encoding.GetBytes(line);
        foreach (byte b in bytes)
            yield return b;
        foreach (byte b in newLine)
            yield return b;
    }
}

And finally, here's how to use this in your controller:

public FileResult GetResult()
{
    IEnumerable<string> data = GetDataForStream();
    var stream = new ByteStream(Encode(data, Encoding.UTF8));
    return File(stream, "text/plain", "Result.txt");
}
Mabel answered 6/3, 2019 at 9:14 Comment(0)
H
6

I created a class called ProducerConsumerStream that does this. The producer writes data to the stream and the consumer reads. There's a buffer in the middle so that the producer can "write ahead" a little bit. You can define the size of the buffer.

Anyway, if it's not exactly what you're looking for, I suspect it will give you a good idea of how it's done. See Building a new type of stream.

Update

The link went stale, so I've copied my code here. The original article is still available on the Wayback machine at https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852

First, the ProducerConsumerStream class:

using System;
using System.IO;
using System.Threading;
using System.Diagnostics;

namespace Mischel.IO
{
    // This class is safe for 1 producer and 1 consumer.
    public class ProducerConsumerStream : Stream
    {
        private byte[] CircleBuff;
        private int Head;
        private int Tail;

        public bool IsAddingCompleted { get; private set; }
        public bool IsCompleted { get; private set; }

        // For debugging
        private long TotalBytesRead = 0;
        private long TotalBytesWritten = 0;

        public ProducerConsumerStream(int size)
        {
            CircleBuff = new byte[size];
            Head = 1;
            Tail = 0;
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string msg)
        {
            Console.WriteLine(msg);
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string fmt, params object[] parms)
        {
            DebugOut(string.Format(fmt, parms));
        }

        private int ReadBytesAvailable
        {
            get
            {
                if (Head > Tail)
                    return Head - Tail - 1;
                else
                    return CircleBuff.Length - Tail + Head - 1;
            }
        }

        private int WriteBytesAvailable { get { return CircleBuff.Length - ReadBytesAvailable - 1; } }

        private void IncrementTail()
        {
            Tail = (Tail + 1) % CircleBuff.Length;
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsCompleted)
            {
                throw new EndOfStreamException("The stream is empty and has been marked complete for adding.");
            }
            if (count == 0)
            {
                return 0;
            }

            lock (CircleBuff)
            {
                DebugOut("Read: requested {0:N0} bytes. Available = {1:N0}.", count, ReadBytesAvailable);
                while (ReadBytesAvailable == 0)
                {
                    if (IsAddingCompleted)
                    {
                        IsCompleted = true;
                        return 0;
                    }
                    Monitor.Wait(CircleBuff);
                }

                // If Head < Tail, then there are bytes available at the end of the buffer
                // and also at the front of the buffer.
                // If reading from Tail to the end doesn't fulfill the request,
                // and there are still bytes available,
                // then read from the start of the buffer.
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                IncrementTail();
                int bytesToRead;
                if (Tail > Head)
                {
                    // When Tail > Head, we know that there are at least
                    // (CircleBuff.Length - Tail) bytes available in the buffer.
                    bytesToRead = CircleBuff.Length - Tail;
                }
                else
                {
                    bytesToRead = Head - Tail;
                }

                // Don't read more than count bytes!
                bytesToRead = Math.Min(bytesToRead, count);

                Buffer.BlockCopy(CircleBuff, Tail, buffer, offset, bytesToRead);
                Tail += (bytesToRead - 1);
                int bytesRead = bytesToRead;

                // At this point, either we've exhausted the buffer,
                // or Tail is at the end of the buffer and has to wrap around.
                if (bytesRead < count && ReadBytesAvailable > 0)
                {
                    // We haven't fulfilled the read.
                    IncrementTail();
                    // Tail is always equal to 0 here.
                    bytesToRead = Math.Min((count - bytesRead), (Head - Tail));
                    Buffer.BlockCopy(CircleBuff, Tail, buffer, offset + bytesRead, bytesToRead);
                    bytesRead += bytesToRead;
                    Tail += (bytesToRead - 1);
                }

                TotalBytesRead += bytesRead;
                DebugOut("Read: returning {0:N0} bytes. TotalRead={1:N0}", bytesRead, TotalBytesRead);
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                Monitor.Pulse(CircleBuff);
                return bytesRead;
            }
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsAddingCompleted)
            {
                throw new InvalidOperationException("The stream has been marked as complete for adding.");
            }
            lock (CircleBuff)
            {
                DebugOut("Write: requested {0:N0} bytes. Available = {1:N0}", count, WriteBytesAvailable);
                int bytesWritten = 0;
                while (bytesWritten < count)
                {
                    while (WriteBytesAvailable == 0)
                    {
                        Monitor.Wait(CircleBuff);
                    }
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    int bytesToCopy = Math.Min((count - bytesWritten), WriteBytesAvailable);
                    CopyBytes(buffer, offset + bytesWritten, bytesToCopy);
                    TotalBytesWritten += bytesToCopy;
                    DebugOut("Write: {0} bytes written. TotalWritten={1:N0}", bytesToCopy, TotalBytesWritten);
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    bytesWritten += bytesToCopy;
                    Monitor.Pulse(CircleBuff);
                }
            }
        }


        private void CopyBytes(byte[] buffer, int srcOffset, int count)
        {
            // Insert at head
            // The copy might require two separate operations.

            // copy as much as can fit between Head and end of the circular buffer
            int offset = srcOffset;
            int bytesCopied = 0;
            int bytesToCopy = Math.Min(CircleBuff.Length - Head, count);
            if (bytesToCopy > 0)
            {
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                bytesCopied = bytesToCopy;
                Head = (Head + bytesToCopy) % CircleBuff.Length;
                offset += bytesCopied;
            }

            // Copy the remainder, which will go from the beginning of the buffer.
            if (bytesCopied < count)
            {
                bytesToCopy = count - bytesCopied;
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                Head = (Head + bytesToCopy) % CircleBuff.Length;
            }
        }

        public void CompleteAdding()
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            lock (CircleBuff)
            {
                DebugOut("CompleteAdding: {0:N0} bytes written.", TotalBytesWritten);
                IsAddingCompleted = true;
                Monitor.Pulse(CircleBuff);
            }
        }

        public override bool CanRead { get { return true; } }

        public override bool CanSeek { get { return false; } }

        public override bool CanWrite { get { return true; } }

        public override void Flush() { /* does nothing */ }

        public override long Length { get { throw new NotImplementedException(); } }

        public override long Position
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        private bool disposed = false;

        protected override void Dispose(bool disposing)
        {
            if (!disposed)
            {
                base.Dispose(disposing);
                disposed = true;
            }
        }
    }
}

And an example of how to use it:

class Program
{
    static readonly string TestText = "This is a test of the emergency broadcast system.";
    static readonly byte[] TextBytes = Encoding.UTF8.GetBytes(TestText);

    const int Megabyte = 1024 * 1024;

    const int TestBufferSize = 12;

    const int ProducerBufferSize = 4;
    const int ConsumerBufferSize = 5;

    static void Main(string[] args)
    {
        Console.WriteLine("TextBytes contains {0:N0} bytes.", TextBytes.Length);
        using (var pcStream = new ProducerConsumerStream(TestBufferSize))
        {
            Thread ProducerThread = new Thread(ProducerThreadProc);
            Thread ConsumerThread = new Thread(ConsumerThreadProc);
            ProducerThread.Start(pcStream);
            Thread.Sleep(2000);
            ConsumerThread.Start(pcStream);

            ProducerThread.Join();
            ConsumerThread.Join();
        }
        Console.Write("Done. Press Enter.");
        Console.ReadLine();
    }

    static void ProducerThreadProc(object state)
    {
        Console.WriteLine("Producer: started.");
        var pcStream = (ProducerConsumerStream)state;
        int offset = 0;
        while (offset < TestText.Length)
        {
            int bytesToWrite = Math.Min(ProducerBufferSize, TestText.Length - offset);
            pcStream.Write(TextBytes, offset, bytesToWrite);
            offset += bytesToWrite;
        }
        pcStream.CompleteAdding();
        Console.WriteLine("Producer: {0:N0} total bytes written.", offset);
        Console.WriteLine("Producer: exit.");
    }

    static void ConsumerThreadProc(object state)
    {
        Console.WriteLine("Consumer: started.");
        var instream = (ProducerConsumerStream)state;
        int testOffset = 0;

        var inputBuffer = new byte[TextBytes.Length];

        int bytesRead;
        do
        {
            int bytesToRead = Math.Min(ConsumerBufferSize, inputBuffer.Length - testOffset);
            bytesRead = instream.Read(inputBuffer, testOffset, bytesToRead);
            //Console.WriteLine("Consumer: {0:N0} bytes read.", bytesRead);
            testOffset += bytesRead;
        } while (bytesRead != 0);
        Console.WriteLine("Consumer: {0:N0} total bytes read.", testOffset);

        // Compare bytes read with TextBytes
        for (int i = 0; i < TextBytes.Length; ++i)
        {
            if (inputBuffer[i] != TextBytes[i])
            {
                Console.WriteLine("Read error at position {0}", i);
                break;
            }
        }
        Console.WriteLine("Consumer: exit.");
    }
}
Heracles answered 26/2, 2014 at 17:30 Comment(2)
@JimMischel let me know too so I can update my answer; it's still in the Wayback Machine: web.archive.org/web/20151210235510/http://www.informit.com/…Glioma
@drzaus: I copied the code from my article to the answer here.Heracles
I
1

I had the same problem. In my case a third party package only accepts streams but I have an IEnumerable, and couldn't find an answer online so I wrote my own, which I'll share:

public class IEnumerableStringReader : TextReader
{
    private readonly IEnumerator<string> _enumerator;

    private bool eof = false; // is set to true when .MoveNext tells us there is no more data.
    private char[] curLine = null;
    private int curLinePos = 0;

    private bool disposed = false;

    public IEnumerableStringReader(IEnumerable<string> input)
    {
        _enumerator = input.GetEnumerator();
    }

    private void GetNextLine()
    {
        if (eof) return;

        eof = !_enumerator.MoveNext();
        if (eof) return;

        curLine = $"{_enumerator.Current}\r\n" // IEnumerable<string> input implies newlines exist betweent he lines.
            .ToCharArray();

        curLinePos = 0;
    }

    public override int Peek()
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos];
    }

    public override int Read()
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos++];
    }

    public override int Read(char[] buffer, int index, int count)
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");
        if (count == 0) return 0;

        int charsReturned = 0;
        int maxChars = Math.Min(count, buffer.Length - index); // Assuming we dont run out of input chars, we return count characters if we can. If the space left in the buffer is not big enough we return as many as will fit in the buffer. 

        while (charsReturned < maxChars)
        {
            if (curLine == null || curLinePos == curLine.Length) GetNextLine();
            if (eof) return charsReturned;

            int maxCurrentCopy = maxChars - charsReturned;
            int charsAtTheReady = curLine.Length - curLinePos; // chars available in current line                
            int copySize = Math.Min(maxCurrentCopy, charsAtTheReady); // stop at end of buffer.

            // cant use Buffer.BlockCopy because it's byte based and we're dealing with chars.                
            Array.ConstrainedCopy(curLine, curLinePos, buffer, index, copySize);

            index += copySize;
            curLinePos += copySize;
            charsReturned += copySize;
        }

        return charsReturned;
    }

    public override string ReadLine()
    {
        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return null;

        if (curLinePos > 0) // this is necessary in case the client uses both Read() and ReadLine() calls
        {
            var tmp = new string(curLine, curLinePos, (curLine.Length - curLinePos) - 2); // create a new string from the remainder of the char array. The -2 is because GetNextLine appends a crlf.            
            curLinePos = curLine.Length; // so next call will re-read
            return tmp;
        }

        // read full line.
        curLinePos = curLine.Length; // so next call will re-read
        return _enumerator.Current; // if all the client does is call ReadLine this (faster) code path will be taken.                       
    }

    protected override void Dispose(bool disposing)
    {
        if (!disposed)
        {
            _enumerator.Dispose();
            base.Dispose(disposing);
            disposed = true;
        }
    }
}

In my case, I want to use it as input to Datastreams.Csv:

using (var tr = new IEnumerableStringReader(input))
using (var reader = new CsvReader(tr))
{
  while (reader.ReadRecord())
  {
    // do whatever
  }
}
Ira answered 30/8, 2017 at 20:51 Comment(0)
A
1

Using the EnumerableToStream Nuget package, you would implement your method like so:

using EnumerableToStream;

private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
{
    return data.ToStream(encoding);
}

I had the same requirement and ended up rolling my own implementation which I have been using for a while now. Getting all the nitty-gritty details just right took some time and effort. For instance, you want your IEnumerable to be disposed after the stream is read to the end and you don't want multibyte characters to be partially written to the buffer.

In this particular implementation, reading the stream does zero allocations, unlike other implementations using encoding.GetBytes(line).

After seeing this question, I decided to release the code as a Nuget package. Hope it saves you a few hours. The source code is on GitHub.

Akan answered 30/1, 2022 at 19:55 Comment(0)
I
0

Steve Sadler wrote a perfectly working answer. However, he makes it way more difficult than needed

According to the reference source of TextReader you'll need only override Peek and Read:

A subclass must minimally implement the Peek() and Read() methods.

So first I write a function that converts IEnumerable<string> into IEnumerable<char> where a new line is added at the end of each string:

private static IEnumerable<char> ReadCharacters(IEnumerable<string> lines)
{
    foreach (string line in lines)
    {
        foreach (char c in line + Environment.NewLine)
        {
            yield return c;
        }
     }
}

Environment.NewLine is the part that adds the new line at the end of each string.

Now the class is failry straightforward:

class EnumStringReader : TextReader
{
    public EnumStringReader(IEnumerable<string> lines)
    {
        this.enumerator = ReadCharacters(lines).GetEnumerator();
        this.dataAvailable = this.enumerator.MoveNext();
    }
    private bool disposed = false;
    private bool dataAvailable;
    private readonly IEnumerator<char> enumerator;

The constructor takes a sequence of lines to read. It uses this sequence and the earlier written function to convert the sequence into a sequence of characters with the added Environment.NewLine.

It gets the enumerator of the converted sequence, and moves to the first character. It remembers whether there is a first character in DataAvailable

Now we are ready to Peek: if no data available: return -1, otherwise return the current character as int. Do not move forward:

public override int Peek()
{
    this.ThrowIfDisposed();
    return this.dataAvailable ? this.enumerator.Current : -1;
}

Read: if no data available, return -1, otherwise return the current character as int. Move forward to the next character and remember whether there is data available:

public override int Read()
{
    this.ThrowIfDisposed();
    if (this.dataAvailable)
    {
        char nextChar = this.enumerator.Current;
        this.dataAvailable = this.enumerator.MoveNext();
        return (int)nextChar;
     }
     else
     {
         return -1;
     }
}

Don't forget to override Dispose(bool) where you dispose the enumerator.

That is all that is needed. All other functions will use these two.

Now to fill your stream with the lines:

IEnumerable<string> lines = ...
using (TextWriter writer = System.IO.File.CreateText(...))
{
    using (TextReader reader = new EnumStringReader(lines);
    {
        // either write per char:
        while (reader.Peek() != -1)
        {
            char c = (char)reader.Read();
            writer.Write(c);
        } 

        // or write per line:
        string line = reader.ReadLine();
        // line is without newLine!
        while (line != null)
        {
            writer.WriteLine(line);
            line = reader.ReadLine();
        }

        // or write per block
        buffer buf = new char[4096];
        int nrRead = reader.ReadBlock(buf, 0, buf.Length)
        while (nrRead > 0)
        {
            writer.Write(buf, 0, nrRead);
            nrRead = reader.ReadBlock(buf, 0, buf.Length);
        }
    }
}
Iridescence answered 6/3, 2018 at 16:28 Comment(0)
B
0

If you can define the max length of the string in the IEnumerable the solution can be quite simple. If the source is e.g. a sql row the max length is just the sum of the width of the columns:

IEnumerable<string> input = //todo assign
int maxLength = 1024;
var stream = new BufferedStream(new EnumerableStream(input), maxLength);

public class EnumerableStream : Stream
{
    IEnumerator<string> enumerator;
    public EnumerableStream(IEnumerable<string> input) => this.enumerator = input.GetEnumerator();
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotImplementedException();
    public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public override void Flush() => throw new NotImplementedException();

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (offset != 0) throw new ArgumentException("offset not supported");
        if (!enumerator.MoveNext()) return 0;
        return Encoding.UTF8.GetBytes(enumerator.Current, buffer);
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
    public override void SetLength(long value) => throw new NotImplementedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();
    protected override void Dispose(bool disposing)
    {
        enumerator.Dispose();
        base.Dispose(disposing);
    }
}
Bascomb answered 30/10, 2023 at 15:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.