IEnumerable 转换为 Stream

24
我希望能够执行与下面代码示例大致等效的操作。 我想生成和提供数据流,而无需在任何时候将整个数据集保存在内存中。
看起来我需要一些实现可以接受一个IEnumerable<string>(或IEnumerable<byte>)作为其构造函数的Stream。 在读取或需要Stream时,该Stream仅会遍历IEnumerable。 但是我不知道是否有此类Stream实现。
我是否正确? 您是否知道如何执行此操作?
    public FileStreamResult GetResult()
    {
        IEnumerable<string> data = GetDataForStream();

        Stream dataStream = ToStringStream(Encoding.UTF8, data);

        return File(dataStream, "text/plain", "Result");
    }

    private IEnumerable<string> GetDataForStream()
    {
        StringBuilder sb;
        for (int i = 0; i < 10000; i++)
        {
            yield return i.ToString();
            yield return "\r\n";
        }
    }

    private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
    {
        // I have to write my own implementation of stream?
        throw new NotImplementedException();
    }

你能保证流将在单次遍历中按顺序读取吗? - user629926
如果您无法支持寻址操作,只需将“CanSeek”设置为false,然后在调用“Seek”时简单地抛出异常即可。 - Servy
@user629926 - 我想我可以保证这一点。我的意图是将流传递给MVC FileStreamResult,就像我的示例中那样。 - Joel
进一步研究后发现,这可能只是一个坏主意。如果我不实现流长度属性,这个能行吗? - Joel
@Joel 这完全取决于读者是否依赖它。 - Servy
6个回答

10

这是一个只读的Stream实现,它使用一个IEnumerable<byte>作为输入:

public class ByteStream : Stream, IDisposable
{
    private readonly IEnumerator<byte> _input;
    private bool _disposed;

    public ByteStream(IEnumerable<byte> input)
    {
        _input = input.GetEnumerator();
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => 0;
    public override long Position { get; set; } = 0;

    public override int Read(byte[] buffer, int offset, int count)
    {
        int i = 0;
        for (; i < count && _input.MoveNext(); i++)
            buffer[i + offset] = _input.Current;
        return i;
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();
    public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override void Flush() => throw new InvalidOperationException();

    void IDisposable.Dispose()
    {
        if (_disposed)
            return;
        _input.Dispose();
        _disposed=  true;
    }
}

你还需要一个函数,将 IEnumerable<string> 转换为 IEnumerable<byte>:
public static IEnumerable<byte> Encode(IEnumerable<string> input, Encoding encoding)
{
    byte[] newLine = encoding.GetBytes(Environment.NewLine);
    foreach (string line in input)
    {
        byte[] bytes = encoding.GetBytes(line);
        foreach (byte b in bytes)
            yield return b;
        foreach (byte b in newLine)
            yield return b;
    }
}

最后,这是如何在您的控制器中使用它:

public FileResult GetResult()
{
    IEnumerable<string> data = GetDataForStream();
    var stream = new ByteStream(Encode(data, Encoding.UTF8));
    return File(stream, "text/plain", "Result.txt");
}

6
我创建了一个名为ProducerConsumerStream的类来实现这一功能。生产者向流中写入数据,消费者读取数据。中间有一个缓冲区,以便生产者可以“提前”写入一些数据。您可以定义缓冲区的大小。
总之,如果它不是您要寻找的完全内容,我认为它会给您提供一个很好的想法。请参见构建新类型的流
更新
链接已过期,所以我在这里复制了我的代码。原始文章仍然可以在Wayback机器上找到:https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852 首先,是ProducerConsumerStream类:
using System;
using System.IO;
using System.Threading;
using System.Diagnostics;

namespace Mischel.IO
{
    // This class is safe for 1 producer and 1 consumer.
    public class ProducerConsumerStream : Stream
    {
        private byte[] CircleBuff;
        private int Head;
        private int Tail;

        public bool IsAddingCompleted { get; private set; }
        public bool IsCompleted { get; private set; }

        // For debugging
        private long TotalBytesRead = 0;
        private long TotalBytesWritten = 0;

        public ProducerConsumerStream(int size)
        {
            CircleBuff = new byte[size];
            Head = 1;
            Tail = 0;
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string msg)
        {
            Console.WriteLine(msg);
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string fmt, params object[] parms)
        {
            DebugOut(string.Format(fmt, parms));
        }

        private int ReadBytesAvailable
        {
            get
            {
                if (Head > Tail)
                    return Head - Tail - 1;
                else
                    return CircleBuff.Length - Tail + Head - 1;
            }
        }

        private int WriteBytesAvailable { get { return CircleBuff.Length - ReadBytesAvailable - 1; } }

        private void IncrementTail()
        {
            Tail = (Tail + 1) % CircleBuff.Length;
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsCompleted)
            {
                throw new EndOfStreamException("The stream is empty and has been marked complete for adding.");
            }
            if (count == 0)
            {
                return 0;
            }

            lock (CircleBuff)
            {
                DebugOut("Read: requested {0:N0} bytes. Available = {1:N0}.", count, ReadBytesAvailable);
                while (ReadBytesAvailable == 0)
                {
                    if (IsAddingCompleted)
                    {
                        IsCompleted = true;
                        return 0;
                    }
                    Monitor.Wait(CircleBuff);
                }

                // If Head < Tail, then there are bytes available at the end of the buffer
                // and also at the front of the buffer.
                // If reading from Tail to the end doesn't fulfill the request,
                // and there are still bytes available,
                // then read from the start of the buffer.
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                IncrementTail();
                int bytesToRead;
                if (Tail > Head)
                {
                    // When Tail > Head, we know that there are at least
                    // (CircleBuff.Length - Tail) bytes available in the buffer.
                    bytesToRead = CircleBuff.Length - Tail;
                }
                else
                {
                    bytesToRead = Head - Tail;
                }

                // Don't read more than count bytes!
                bytesToRead = Math.Min(bytesToRead, count);

                Buffer.BlockCopy(CircleBuff, Tail, buffer, offset, bytesToRead);
                Tail += (bytesToRead - 1);
                int bytesRead = bytesToRead;

                // At this point, either we've exhausted the buffer,
                // or Tail is at the end of the buffer and has to wrap around.
                if (bytesRead < count && ReadBytesAvailable > 0)
                {
                    // We haven't fulfilled the read.
                    IncrementTail();
                    // Tail is always equal to 0 here.
                    bytesToRead = Math.Min((count - bytesRead), (Head - Tail));
                    Buffer.BlockCopy(CircleBuff, Tail, buffer, offset + bytesRead, bytesToRead);
                    bytesRead += bytesToRead;
                    Tail += (bytesToRead - 1);
                }

                TotalBytesRead += bytesRead;
                DebugOut("Read: returning {0:N0} bytes. TotalRead={1:N0}", bytesRead, TotalBytesRead);
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                Monitor.Pulse(CircleBuff);
                return bytesRead;
            }
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsAddingCompleted)
            {
                throw new InvalidOperationException("The stream has been marked as complete for adding.");
            }
            lock (CircleBuff)
            {
                DebugOut("Write: requested {0:N0} bytes. Available = {1:N0}", count, WriteBytesAvailable);
                int bytesWritten = 0;
                while (bytesWritten < count)
                {
                    while (WriteBytesAvailable == 0)
                    {
                        Monitor.Wait(CircleBuff);
                    }
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    int bytesToCopy = Math.Min((count - bytesWritten), WriteBytesAvailable);
                    CopyBytes(buffer, offset + bytesWritten, bytesToCopy);
                    TotalBytesWritten += bytesToCopy;
                    DebugOut("Write: {0} bytes written. TotalWritten={1:N0}", bytesToCopy, TotalBytesWritten);
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    bytesWritten += bytesToCopy;
                    Monitor.Pulse(CircleBuff);
                }
            }
        }


        private void CopyBytes(byte[] buffer, int srcOffset, int count)
        {
            // Insert at head
            // The copy might require two separate operations.

            // copy as much as can fit between Head and end of the circular buffer
            int offset = srcOffset;
            int bytesCopied = 0;
            int bytesToCopy = Math.Min(CircleBuff.Length - Head, count);
            if (bytesToCopy > 0)
            {
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                bytesCopied = bytesToCopy;
                Head = (Head + bytesToCopy) % CircleBuff.Length;
                offset += bytesCopied;
            }

            // Copy the remainder, which will go from the beginning of the buffer.
            if (bytesCopied < count)
            {
                bytesToCopy = count - bytesCopied;
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                Head = (Head + bytesToCopy) % CircleBuff.Length;
            }
        }

        public void CompleteAdding()
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            lock (CircleBuff)
            {
                DebugOut("CompleteAdding: {0:N0} bytes written.", TotalBytesWritten);
                IsAddingCompleted = true;
                Monitor.Pulse(CircleBuff);
            }
        }

        public override bool CanRead { get { return true; } }

        public override bool CanSeek { get { return false; } }

        public override bool CanWrite { get { return true; } }

        public override void Flush() { /* does nothing */ }

        public override long Length { get { throw new NotImplementedException(); } }

        public override long Position
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        private bool disposed = false;

        protected override void Dispose(bool disposing)
        {
            if (!disposed)
            {
                base.Dispose(disposing);
                disposed = true;
            }
        }
    }
}

以下是如何使用它的示例:

class Program
{
    static readonly string TestText = "This is a test of the emergency broadcast system.";
    static readonly byte[] TextBytes = Encoding.UTF8.GetBytes(TestText);

    const int Megabyte = 1024 * 1024;

    const int TestBufferSize = 12;

    const int ProducerBufferSize = 4;
    const int ConsumerBufferSize = 5;

    static void Main(string[] args)
    {
        Console.WriteLine("TextBytes contains {0:N0} bytes.", TextBytes.Length);
        using (var pcStream = new ProducerConsumerStream(TestBufferSize))
        {
            Thread ProducerThread = new Thread(ProducerThreadProc);
            Thread ConsumerThread = new Thread(ConsumerThreadProc);
            ProducerThread.Start(pcStream);
            Thread.Sleep(2000);
            ConsumerThread.Start(pcStream);

            ProducerThread.Join();
            ConsumerThread.Join();
        }
        Console.Write("Done. Press Enter.");
        Console.ReadLine();
    }

    static void ProducerThreadProc(object state)
    {
        Console.WriteLine("Producer: started.");
        var pcStream = (ProducerConsumerStream)state;
        int offset = 0;
        while (offset < TestText.Length)
        {
            int bytesToWrite = Math.Min(ProducerBufferSize, TestText.Length - offset);
            pcStream.Write(TextBytes, offset, bytesToWrite);
            offset += bytesToWrite;
        }
        pcStream.CompleteAdding();
        Console.WriteLine("Producer: {0:N0} total bytes written.", offset);
        Console.WriteLine("Producer: exit.");
    }

    static void ConsumerThreadProc(object state)
    {
        Console.WriteLine("Consumer: started.");
        var instream = (ProducerConsumerStream)state;
        int testOffset = 0;

        var inputBuffer = new byte[TextBytes.Length];

        int bytesRead;
        do
        {
            int bytesToRead = Math.Min(ConsumerBufferSize, inputBuffer.Length - testOffset);
            bytesRead = instream.Read(inputBuffer, testOffset, bytesToRead);
            //Console.WriteLine("Consumer: {0:N0} bytes read.", bytesRead);
            testOffset += bytesRead;
        } while (bytesRead != 0);
        Console.WriteLine("Consumer: {0:N0} total bytes read.", testOffset);

        // Compare bytes read with TextBytes
        for (int i = 0; i < TextBytes.Length; ++i)
        {
            if (inputBuffer[i] != TextBytes[i])
            {
                Console.WriteLine("Read error at position {0}", i);
                break;
            }
        }
        Console.WriteLine("Consumer: exit.");
    }
}

@JimMischel也让我知道,这样我就可以更新我的答案了;它仍然在Wayback Machine上:https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852 - drzaus
@drzaus:我将代码从我的文章复制到了这里的答案中。 - Jim Mischel

1

我有同样的问题。在我的情况下,一个第三方包只接受流,但我有一个IEnumerable,找不到在线答案,所以我写了自己的代码,现在分享给大家:

public class IEnumerableStringReader : TextReader
{
    private readonly IEnumerator<string> _enumerator;

    private bool eof = false; // is set to true when .MoveNext tells us there is no more data.
    private char[] curLine = null;
    private int curLinePos = 0;

    private bool disposed = false;

    public IEnumerableStringReader(IEnumerable<string> input)
    {
        _enumerator = input.GetEnumerator();
    }

    private void GetNextLine()
    {
        if (eof) return;

        eof = !_enumerator.MoveNext();
        if (eof) return;

        curLine = $"{_enumerator.Current}\r\n" // IEnumerable<string> input implies newlines exist betweent he lines.
            .ToCharArray();

        curLinePos = 0;
    }

    public override int Peek()
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos];
    }

    public override int Read()
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos++];
    }

    public override int Read(char[] buffer, int index, int count)
    {
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");
        if (count == 0) return 0;

        int charsReturned = 0;
        int maxChars = Math.Min(count, buffer.Length - index); // Assuming we dont run out of input chars, we return count characters if we can. If the space left in the buffer is not big enough we return as many as will fit in the buffer. 

        while (charsReturned < maxChars)
        {
            if (curLine == null || curLinePos == curLine.Length) GetNextLine();
            if (eof) return charsReturned;

            int maxCurrentCopy = maxChars - charsReturned;
            int charsAtTheReady = curLine.Length - curLinePos; // chars available in current line                
            int copySize = Math.Min(maxCurrentCopy, charsAtTheReady); // stop at end of buffer.

            // cant use Buffer.BlockCopy because it's byte based and we're dealing with chars.                
            Array.ConstrainedCopy(curLine, curLinePos, buffer, index, copySize);

            index += copySize;
            curLinePos += copySize;
            charsReturned += copySize;
        }

        return charsReturned;
    }

    public override string ReadLine()
    {
        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return null;

        if (curLinePos > 0) // this is necessary in case the client uses both Read() and ReadLine() calls
        {
            var tmp = new string(curLine, curLinePos, (curLine.Length - curLinePos) - 2); // create a new string from the remainder of the char array. The -2 is because GetNextLine appends a crlf.            
            curLinePos = curLine.Length; // so next call will re-read
            return tmp;
        }

        // read full line.
        curLinePos = curLine.Length; // so next call will re-read
        return _enumerator.Current; // if all the client does is call ReadLine this (faster) code path will be taken.                       
    }

    protected override void Dispose(bool disposing)
    {
        if (!disposed)
        {
            _enumerator.Dispose();
            base.Dispose(disposing);
            disposed = true;
        }
    }
}

在我的情况下,我想将其用作Datastreams.Csv的输入:

using (var tr = new IEnumerableStringReader(input))
using (var reader = new CsvReader(tr))
{
  while (reader.ReadRecord())
  {
    // do whatever
  }
}

1
使用 EnumerableToStream Nuget 包,您可以这样实现您的方法:
using EnumerableToStream;

private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
{
    return data.ToStream(encoding);
}

我有同样的需求,最终自己实现了一个版本,我已经使用了一段时间。花费了一些时间和精力来处理所有微小的细节。例如,你希望在流读取到结尾后将IEnumerable释放,并且不希望多字节字符部分写入缓冲区。
在这个特定的实现中,读取流时不进行任何分配,不像其他使用encoding.GetBytes(line)的实现方式。
看到这个问题后,我决定将代码作为Nuget包发布。希望它可以为你节省几个小时。源代码在GitHub上。

0

Steve Sadler写了一个完美的答案。然而,他让它变得比必要的困难。

根据TextReader的参考来源,你只需要重写Peek和Read:

子类必须最少实现Peek()和Read()方法。

所以首先我编写一个函数,将IEnumerable<string>转换为IEnumerable<char>,其中在每个字符串末尾添加一个新行:

private static IEnumerable<char> ReadCharacters(IEnumerable<string> lines)
{
    foreach (string line in lines)
    {
        foreach (char c in line + Environment.NewLine)
        {
            yield return c;
        }
     }
}

Environment.NewLine 是在每个字符串末尾添加新行的部分。

现在这个类相当简单:

class EnumStringReader : TextReader
{
    public EnumStringReader(IEnumerable<string> lines)
    {
        this.enumerator = ReadCharacters(lines).GetEnumerator();
        this.dataAvailable = this.enumerator.MoveNext();
    }
    private bool disposed = false;
    private bool dataAvailable;
    private readonly IEnumerator<char> enumerator;

构造函数接受要读取的行序列。它使用此序列和先前编写的函数将序列转换为带有添加的Environment.NewLine的字符序列。

它获取转换后序列的枚举器,并移动到第一个字符。它记住DataAvailable中是否有第一个字符。

现在我们准备好Peek:如果没有可用数据:返回-1,否则将当前字符作为int返回。不要向前移动:

public override int Peek()
{
    this.ThrowIfDisposed();
    return this.dataAvailable ? this.enumerator.Current : -1;
}

读取:如果没有数据可用,则返回-1,否则返回当前字符作为int。向前移动到下一个字符,并记住是否有可用数据:

public override int Read()
{
    this.ThrowIfDisposed();
    if (this.dataAvailable)
    {
        char nextChar = this.enumerator.Current;
        this.dataAvailable = this.enumerator.MoveNext();
        return (int)nextChar;
     }
     else
     {
         return -1;
     }
}

不要忘记在释放枚举器的地方重写Dispose(bool)方法。
这就是所需的全部内容。所有其他函数都将使用这两个函数。
现在,让我们来填充您的流中的行:
IEnumerable<string> lines = ...
using (TextWriter writer = System.IO.File.CreateText(...))
{
    using (TextReader reader = new EnumStringReader(lines);
    {
        // either write per char:
        while (reader.Peek() != -1)
        {
            char c = (char)reader.Read();
            writer.Write(c);
        } 

        // or write per line:
        string line = reader.ReadLine();
        // line is without newLine!
        while (line != null)
        {
            writer.WriteLine(line);
            line = reader.ReadLine();
        }

        // or write per block
        buffer buf = new char[4096];
        int nrRead = reader.ReadBlock(buf, 0, buf.Length)
        while (nrRead > 0)
        {
            writer.Write(buf, 0, nrRead);
            nrRead = reader.ReadBlock(buf, 0, buf.Length);
        }
    }
}

0
如果您可以定义IEnumerable中字符串的最大长度,解决方案就会变得非常简单。如果源是例如SQL行,则最大长度只是列宽的总和。
IEnumerable<string> input = //todo assign
int maxLength = 1024;
var stream = new BufferedStream(new EnumerableStream(input), maxLength);

public class EnumerableStream : Stream
{
    IEnumerator<string> enumerator;
    public EnumerableStream(IEnumerable<string> input) => this.enumerator = input.GetEnumerator();
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotImplementedException();
    public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
    public override void Flush() => throw new NotImplementedException();

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (offset != 0) throw new ArgumentException("offset not supported");
        if (!enumerator.MoveNext()) return 0;
        return Encoding.UTF8.GetBytes(enumerator.Current, buffer);
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
    public override void SetLength(long value) => throw new NotImplementedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();
    protected override void Dispose(bool disposing)
    {
        enumerator.Dispose();
        base.Dispose(disposing);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接