如何将两个System.IO.Stream实例连接成一个?

38

假设我想将三个文件依次流式传输给用户,但是除了他向我提供Stream对象以推送字节外,我必须向他提供一个Stream对象来从中获取字节。我希望能够使用我的三个FileStream对象(甚至更巧妙的是,使用一个IEnumerable<Stream>)并返回一个新的ConcatenatedStream对象,在需要的时候从源流中拉取数据。

5个回答

38
class ConcatenatedStream : Stream
{
    Queue<Stream> streams;

    public ConcatenatedStream(IEnumerable<Stream> streams)
    {
        this.streams = new Queue<Stream>(streams);
    }

    public override bool CanRead
    {
        get { return true; }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int totalBytesRead = 0;

        while (count > 0 && streams.Count > 0)
        {
            int bytesRead = streams.Peek().Read(buffer, offset, count);
            if (bytesRead == 0)
            {
                streams.Dequeue().Dispose();
                continue;
            }

            totalBytesRead += bytesRead;
            offset += bytesRead;
            count -= bytesRead;
        }

        return totalBytesRead;
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return false; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get { throw new NotImplementedException(); }
    }

    public override long Position
    {
        get
        {
            throw new NotImplementedException();
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }
}

10
或许解决方案太优雅了,以至于它是不合法的。 - Muhammad Hasan Khan
4
这个解决方案基本上不起作用,而且存在一个严重的错误。如果我尝试一次性读取所有数据,我将只能获得第一个流数据。 - Marc Gravell
2
@HasanKhan 我认为这是来自于一个错误的“moderator message”或类似的东西;就像我说的,“不是我”。然而,我怀疑那个人是在说如果他们传入一个大缓冲区,它只会从第一个流(或前两个流)中读取 - 可能应该是一个while而不是一个if。然而!我会说,如果他们期望Read填充尽可能多的缓冲区,那么那个人并没有正确地调用Read。所有Read必须返回至少一个字节或EOF。 - Marc Gravell
3
问题在于 bytesRead < count 不足以成为关闭队列头部流的充分理由,必须要是 。因此,如果您正在连接操作以固定块大小(如某些情况下的 CryptoStream)运行的流,则除非 count 正好匹配块大小,否则流将在完全读取之前关闭。 - KingPong
4
这个解决方案的想法非常好,但实现有很多问题。鉴于其他答案的存在,我也不明白为什么会被接受。任何人都不应该使用这段代码。 - usr
显示剩余5条评论

10
只要您只需要阅读,这里是我实现这样一个流的方式:

注意!位置和查找功能已失效,需要修复

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;

namespace LVK.IO
{
    /// <summary>
    /// This class is a <see cref="Stream"/> descendant that manages multiple underlying
    /// streams which are considered to be chained together to one large stream. Only reading
    /// and seeking is allowed, writing will throw exceptions.
    /// </summary>
    public class CombinedStream : Stream
    {
        private readonly Stream[] _UnderlyingStreams;
        private readonly Int64[] _UnderlyingStartingPositions;
        private Int64 _Position;
        private readonly Int64 _TotalLength;
        private int _Index;

        /// <summary>
        /// Constructs a new <see cref="CombinedStream"/> on top of the specified array
        /// of streams.
        /// </summary>
        /// <param name="underlyingStreams">
        /// An array of <see cref="Stream"/> objects that will be chained together and
        /// considered to be one big stream.
        /// </param>
        public CombinedStream(params Stream[] underlyingStreams)
        {
            if (underlyingStreams == null)
                throw new ArgumentNullException("underlyingStreams");
            foreach (Stream stream in underlyingStreams)
            {
                if (stream == null)
                    throw new ArgumentNullException("underlyingStreams[]");
                if (!stream.CanRead)
                    throw new InvalidOperationException("CanRead not true for all streams");
                if (!stream.CanSeek)
                    throw new InvalidOperationException("CanSeek not true for all streams");
            }

            _UnderlyingStreams = new Stream[underlyingStreams.Length];
            _UnderlyingStartingPositions = new Int64[underlyingStreams.Length];
            Array.Copy(underlyingStreams, _UnderlyingStreams, underlyingStreams.Length);

            _Position = 0;
            _Index = 0;

            _UnderlyingStartingPositions[0] = 0;
            for (int index = 1; index < _UnderlyingStartingPositions.Length; index++)
            {
                _UnderlyingStartingPositions[index] =
                    _UnderlyingStartingPositions[index - 1] +
                    _UnderlyingStreams[index - 1].Length;
            }

            _TotalLength =
                _UnderlyingStartingPositions[_UnderlyingStartingPositions.Length - 1] +
                _UnderlyingStreams[_UnderlyingStreams.Length - 1].Length;
        }

        /// <summary>
        /// Gets a value indicating whether the current stream supports reading.
        /// </summary>
        /// <value>
        /// <c>true</c>.
        /// </value>
        /// <returns>
        /// Always <c>true</c> for <see cref="CombinedStream"/>.
        /// </returns>
        public override Boolean CanRead
        {
            get
            {
                return true;
            }
        }

        /// <summary>
        /// Gets a value indicating whether the current stream supports seeking.
        /// </summary>
        /// <value>
        /// <c>true</c>.
        /// </value>
        /// <returns>
        /// Always <c>true</c> for <see cref="CombinedStream"/>.
        /// </returns>
        public override Boolean CanSeek
        {
            get
            {
                return true;
            }
        }

        /// <summary>
        /// Gets a value indicating whether the current stream supports writing.
        /// </summary>
        /// <value>
        /// <c>false</c>.
        /// </value>
        /// <returns>
        /// Always <c>false</c> for <see cref="CombinedStream"/>.
        /// </returns>
        public override Boolean CanWrite
        {
            get
            {
                return false;
            }
        }

        /// <summary>
        /// When overridden in a derived class, clears all buffers for this stream and causes any buffered data to be written to the underlying device.
        /// </summary>
        /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
        public override void Flush()
        {
            foreach (Stream stream in _UnderlyingStreams)
            {
                stream.Flush();
            }
        }

        /// <summary>
        /// Gets the total length in bytes of the underlying streams.
        /// </summary>
        /// <value>
        /// The total length of the underlying streams.
        /// </value>
        /// <returns>
        /// A long value representing the total length of the underlying streams in bytes.
        /// </returns>
        /// <exception cref="T:System.NotSupportedException">A class derived from Stream does not support seeking. </exception>
        /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
        public override Int64 Length
        {
            get
            {
                return _TotalLength;
            }
        }

        /// <summary>
        /// Gets or sets the position within the current stream.
        /// </summary>
        /// <value></value>
        /// <returns>The current position within the stream.</returns>
        /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
        /// <exception cref="T:System.NotSupportedException">The stream does not support seeking. </exception>
        /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
        public override Int64 Position
        {
            get
            {
                return _Position;
            }

            set
            {
                if (value < 0 || value > _TotalLength)
                    throw new ArgumentOutOfRangeException("Position");

                _Position = value;
                if (value == _TotalLength)
                {
                    _Index = _UnderlyingStreams.Length - 1;
                    _Position = _UnderlyingStreams[_Index].Length;
                }

                else
                {
                    while (_Index > 0 && _Position < _UnderlyingStartingPositions[_Index])
                    {
                        _Index--;
                    }

                    while (_Index < _UnderlyingStreams.Length - 1 &&
                           _Position >= _UnderlyingStartingPositions[_Index] + _UnderlyingStreams[_Index].Length)
                    {
                        _Index++;
                    }
                }
            }
        }

        /// <summary>
        /// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
        /// </summary>
        /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source.</param>
        /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream.</param>
        /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
        /// <returns>
        /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
        /// </returns>
        /// <exception cref="T:System.ArgumentException">The sum of offset and count is larger than the buffer length. </exception>
        /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
        /// <exception cref="T:System.NotSupportedException">The stream does not support reading. </exception>
        /// <exception cref="T:System.ArgumentNullException">buffer is null. </exception>
        /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
        /// <exception cref="T:System.ArgumentOutOfRangeException">offset or count is negative. </exception>
        public override int Read(Byte[] buffer, int offset, int count)
        {
            int result = 0;
            while (count > 0)
            {
                _UnderlyingStreams[_Index].Position = _Position - _UnderlyingStartingPositions[_Index];
                int bytesRead = _UnderlyingStreams[_Index].Read(buffer, offset, count);
                result += bytesRead;
                offset += bytesRead;
                count -= bytesRead;
                _Position += bytesRead;

                if (count > 0)
                {
                    if (_Index < _UnderlyingStreams.Length - 1)
                        _Index++;
                    else
                        break;
                }
            }

            return result;
        }

        /// <summary>
        /// Sets the position within the current stream.
        /// </summary>
        /// <param name="offset">A byte offset relative to the origin parameter.</param>
        /// <param name="origin">A value of type <see cref="T:System.IO.SeekOrigin"></see> indicating the reference point used to obtain the new position.</param>
        /// <returns>
        /// The new position within the current stream.
        /// </returns>
        /// <exception cref="T:System.IO.IOException">An I/O error occurs. </exception>
        /// <exception cref="T:System.NotSupportedException">The stream does not support seeking, such as if the stream is constructed from a pipe or console output. </exception>
        /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
        public override long Seek(long offset, SeekOrigin origin)
        {
            switch (origin)
            {
                case SeekOrigin.Begin:
                    Position = offset;
                    break;

                case SeekOrigin.Current:
                    Position += offset;
                    break;

                case SeekOrigin.End:
                    Position = Length + offset;
                    break;
            }

            return Position;
        }

        /// <summary>
        /// Throws <see cref="NotSupportedException"/> since the <see cref="CombinedStream"/>
        /// class does not supports changing the length.
        /// </summary>
        /// <param name="value">The desired length of the current stream in bytes.</param>
        /// <exception cref="T:System.NotSupportedException">
        /// <see cref="CombinedStream"/> does not support this operation.
        /// </exception>
        public override void SetLength(long value)
        {
            throw new NotSupportedException("The method or operation is not supported by CombinedStream.");
        }

        /// <summary>
        /// Throws <see cref="NotSupportedException"/> since the <see cref="CombinedStream"/>
        /// class does not supports writing to the underlying streams.
        /// </summary>
        /// <param name="buffer">An array of bytes.  This method copies count bytes from buffer to the current stream.</param>
        /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes to the current stream.</param>
        /// <param name="count">The number of bytes to be written to the current stream.</param>
        /// <exception cref="T:System.NotSupportedException">
        /// <see cref="CombinedStream"/> does not support this operation.
        /// </exception>
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException("The method or operation is not supported by CombinedStream.");
        }
    }
}

1
应添加Dispose方法的实现,以处理UnderlyingStreams的释放。 - NineBerry
这是一个只读流,你应该将 Flush() 方法设置为空或抛出 NotSupportedException 异常吗? - dbardakov
我倾向于同意,但文档还提到“清除缓冲区”,这可能意味着释放保存在磁盘上的缓冲数据的内存。因此,我将其保留不变,可能应该更好地记录为什么我将其保留在那里。 - Lasse V. Karlsen
PositionSeek 的返回值似乎有问题。证明如下:var ms1 = new MemoryStream(new byte[] { 1,2 }); var ms2 = new MemoryStream(new byte[] { 1,2,3 }); var combined = new CombinedStream(ms1, ms2); combined.Length.Dump(); // 5,正确 combined.Seek(0, SeekOrigin.End).Dump(); // 实际为 3,预期为 5 combined.Position.Dump(); // 实际为 3,预期为 5 - Pavel Martynov
1
我已经为此创建了一个GitHub项目 - https://github.com/lassevk/Streams - 并将很快提交一些测试和修复的代码(可能要等到周末之后)。 - Lasse V. Karlsen
显示剩余3条评论

9
未经测试,但类似于以下内容:
class StreamEnumerator : Stream
{
    private long position;
    bool closeStreams;
    IEnumerator<Stream> iterator;
    Stream current;
    private void EndOfStream() {
        if (closeStreams && current != null)
        {
            current.Close();
            current.Dispose();
        }
        current = null;
    }
    private Stream Current
    {
        get {
            if(current != null) return current;
            if (iterator == null) throw new ObjectDisposedException(GetType().Name);
            if (iterator.MoveNext()) {
                current = iterator.Current;
            }
            return current;
        }
    }
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            EndOfStream();
            iterator.Dispose();
            iterator = null;
            current = null;
        }
        base.Dispose(disposing);
    }
    public StreamEnumerator(IEnumerable<Stream> source, bool closeStreams)
    {
        if (source == null) throw new ArgumentNullException("source");
        iterator = source.GetEnumerator();
        this.closeStreams = closeStreams;
    }
    public override bool CanRead { get { return true; } }
    public override bool CanWrite { get { return false; } }
    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }
    public override void WriteByte(byte value)
    {
        throw new NotSupportedException();
    }
    public override bool CanSeek { get { return false; } }
    public override bool CanTimeout { get { return false; } }
    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }
    public override void Flush()
    { /* nothing to do */ }
    public override long Length
    {
        get { throw new NotSupportedException(); }
    }
    public override long Position
    {
        get { return position; }
        set { if (value != this.position) throw new NotSupportedException(); }
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        int result = 0;
        while (count > 0)
        {
            Stream stream = Current;
            if (stream == null) break;
            int thisCount = stream.Read(buffer, offset, count);
            result += thisCount;
            count -= thisCount;
            offset += thisCount;
            if (thisCount == 0) EndOfStream();
        }
        position += result;
        return result;
    }
}

2

编辑:清晰表明这是一个可寻址选项。

这里有一个支持寻址的选项,在许多情况下都是必须的。它缺少一些处理底层流的功能,如果您愿意假设底层流的长度不会改变,它可以更加高效。总长度和流偏移量可以计算出一次。

public sealed class SeekableConcatenatedStream : Stream
{
    List<Stream> streams;
    private long _Position { get; set; }

    public SeekableConcatenatedStream(List<Stream> streams)
    {
        foreach (var s in streams)
        {
          if (!s.CanSeek)
              throw new ArgumentException($"All provided streams must be be seekable to create a {nameof(SeekableConcatenatedStream)}");
        }

        this.streams = streams;
        Seek(0, SeekOrigin.Begin);
    }

    public override bool CanRead
    {
        get { return true; }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (streams.Count == 0)
            return 0;

        var startStream = 0;
        var cumulativeCapacity = 0L;
        for (var i = 0; i < streams.Count; i++)
        {
            cumulativeCapacity += streams[i].Length;
            if (_Position < cumulativeCapacity)
            {
                startStream = i;
                break;
            }
        }

        var bytesRead = 0;
        var curStream = startStream;

        while (_Position < Length && bytesRead < count && curStream < streams.Count)
        {
            var r = streams[curStream].Read(buffer, offset + bytesRead, count - bytesRead);
            bytesRead += r;
            Seek(_Position + r, SeekOrigin.Begin);
            curStream++;
        }

        return bytesRead;
    }

    public override bool CanSeek
    {
        get { return true; }
    }

    public override bool CanWrite
    {
        get { return false; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get {
            long length = 0;
            for (var i = 0; i < streams.Count; i++)
            {
                length += streams[i].Length;
            }
            return length;
        }
    }

    public override long Position
    {
        get
        {
            return _Position;
        }
        set
        {
            Seek(value, SeekOrigin.Begin);
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        if (origin == SeekOrigin.Begin)
        {
            _Position = offset;

            var prevLength = 0L;
            var cumulativeLength = 0L;
            for (var i = 0; i < streams.Count; i++)
            {
                cumulativeLength += streams[i].Length;
                if (offset < cumulativeLength)
                {
                    streams[i].Seek(offset - prevLength, SeekOrigin.Begin);
                    return _Position;
                }
                prevLength = cumulativeLength;
            }
        }

        if (origin == SeekOrigin.Current)
        {
            var newAbs = _Position + offset;
            return Seek(newAbs, SeekOrigin.Begin);
        } 
        else if(origin == SeekOrigin.End)
        {
            var newAbs = Length - offset;
            return Seek(newAbs, SeekOrigin.Begin);
        }

        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }
}

这不是一个好的解决方案,因为并非所有的 Stream 实现都提供寻址能力(因此并非所有流都提供 Length)。你的代码广泛依赖于 Length 可用,因此它只能在支持寻址的流上工作。 - ProgrammingLlama
1
我试图通过语句“这里有一个支持寻址的选项”来提前放置免责声明。可以合理地假设存在一些问题,这些问题依赖于可寻址流才能解决。如果您想在这种情况下提供连接的流,则它也需要是可寻址的。因此,说它“不是一个好的解决方案”是不公平的。您的观点提出了一个合理的改进,即如果任何子流不可寻址,则构建可寻址的连接流应该失败。如果我有时间,我会更新代码以反映这一点。 - kriskalish
当然可以连接不可寻址的流,正如被接受的答案所演示的那样。你加入的CanSeek至少使得你的答案在这种情况下失败得更好一些,因此我会取消我的负投票。 - ProgrammingLlama

1
为什么不使用已经封装了多个文件的容器,比如使用SharpZipLib中的ZipOutputStream呢?

1
很好,但实际上我发送的数据并不是文件——我是即时生成它的,因此希望在内存中尽可能少地保存它。 - Sebastian Good
1
无论您是否发送文件,重要的是将您关心的流写入“ZipOutputStream”,并在每个流前加上“ZipEntry”标记。 - Ed Courtenay

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接