为生产者/消费者实现异步流

10

有一个库将其结果输出到给定的 Stream 对象中。 我想在库完成之前开始使用结果。 为了简化使用并避免生产者过多消耗内存,Stream 应该是阻塞的;为了允许生产者和消费者独立存在,它还应该是线程安全的。

一旦库完成,生产者线程应关闭流,从而通知消费者没有更多数据。

我考虑使用 NetworkStreamPipeStream(匿名),但两者都可能很慢,因为它们通过内核发送数据。

有什么建议吗?

var stream = new AsyncBlockingBufferedStream();

void ProduceData()
{
  // In producer thread
  externalLib.GenerateData(stream);
  stream.Close();
}

void ConsumeData()
{
  // In consumer thread
  int read;
  while ((read = stream.Read(...)) != 0)
  { ... }
}

它看起来像是 http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx 可能是一个有用的选择。 - µBio
3
关于“它们两个可能会很慢,因为它们通过内核发送数据”的问题,我会验证这种情况是否真的存在。你可能会感到惊喜。 - kbrimington
可能是线程安全的异步字节队列的重复问题。 - dtb
@BioBuckyBall - 是的,RX很棒,但它无法处理System.IO.Stream。 @kbrimington - 是的,这是一个假设,但内核调用就是内核调用 :) - Yuri Astrakhan
另一种解决方案 - aepot
3个回答

16

基于Chris Taylor之前的答案,这是我自己改进的版本,使用更快的块操作和更正了写入完成通知。现在它被标记为wiki,所以你可以修改它。

public class BlockingStream : Stream
{
    private readonly BlockingCollection<byte[]> _blocks;
    private byte[] _currentBlock;
    private int _currentBlockIndex;

    public BlockingStream(int streamWriteCountCache)
    {
        _blocks = new BlockingCollection<byte[]>(streamWriteCountCache);
    }

    public override bool CanTimeout { get { return false; } }
    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() {}
    public long TotalBytesWritten { get; private set; }
    public int WriteCount { get; private set; }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        int bytesRead = 0;
        while (true)
        {
            if (_currentBlock != null)
            {
                int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex);
                Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy);
                _currentBlockIndex += copy;
                bytesRead += copy;

                if (_currentBlock.Length <= _currentBlockIndex)
                {
                    _currentBlock = null;
                    _currentBlockIndex = 0;
                }

                if (bytesRead == count)
                    return bytesRead;
            }

            if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite))
                return bytesRead;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        var newBuf = new byte[count];
        Array.Copy(buffer, offset, newBuf, 0, count);
        _blocks.Add(newBuf);
        TotalBytesWritten += count;
        WriteCount++;
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        if (disposing)
        {
            _blocks.Dispose();
        }
    }

    public override void Close()
    {
        CompleteWriting();
        base.Close();
    }

    public void CompleteWriting()
    {
        _blocks.CompleteAdding();
    }

    private static void ValidateBufferArgs(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if (buffer.Length - offset < count)
            throw new ArgumentException("buffer.Length - offset < count");
    }
}

1
使用该块将大大提高性能。您应该考虑支持超时,或者至少添加取消操作的支持,否则在异常情况下进行读/写可能会导致“死锁”。如果读取器崩溃,则编写器将无限期地阻塞,反之亦然,如果编写器过早崩溃,则可能会在读取上阻塞。我真的很想知道建议的想法是否解决了您的问题,并且它是否像您希望的那样有效? - Chris Taylor
由于你正在使用byte[],使用Buffer.BlockCopy而非Array.Copy会更快。另一个建议是为CanWrite返回!_Blocks.IsAddingCompleted。仅供参考。 - atanamir
这段代码有漏洞,可能会错过流的最后几行。如果想要更好的实现,请考虑使用:http://www.codeproject.com/Articles/345105/Memory-Stream-Multiplexer-write-and-read-from-many - MaYaN

3

我曾使用Yuric BlockingStream,但在我们的代码中运行20分钟至一小时后,性能急剧下降。我认为性能下降是由于垃圾回收器和该方法在快速流传输大量数据时创建的众多缓冲区所致(我没有时间证明这一点)。最终,我创建了一个环形缓冲区版本,与我们的代码一起使用时不会出现性能下降。

/// <summary>
/// A ring-buffer stream that you can read from and write to from
/// different threads.
/// </summary>
public class RingBufferedStream : Stream
{
    private readonly byte[] store;

    private readonly ManualResetEventAsync writeAvailable
        = new ManualResetEventAsync(false);

    private readonly ManualResetEventAsync readAvailable
        = new ManualResetEventAsync(false);

    private readonly CancellationTokenSource cancellationTokenSource
        = new CancellationTokenSource();

    private int readPos;

    private int readAvailableByteCount;

    private int writePos;

    private int writeAvailableByteCount;

    private bool disposed;

    /// <summary>
    /// Initializes a new instance of the <see cref="RingBufferedStream"/>
    /// class.
    /// </summary>
    /// <param name="bufferSize">
    /// The maximum number of bytes to buffer.
    /// </param>
    public RingBufferedStream(int bufferSize)
    {
        this.store = new byte[bufferSize];
        this.writeAvailableByteCount = bufferSize;
        this.readAvailableByteCount = 0;
    }

    /// <inheritdoc/>
    public override bool CanRead => true;

    /// <inheritdoc/>
    public override bool CanSeek => false;

    /// <inheritdoc/>
    public override bool CanWrite => true;

    /// <inheritdoc/>
    public override long Length
    {
        get
        {
            throw new NotSupportedException(
                "Cannot get length on RingBufferedStream");
        }
    }

    /// <inheritdoc/>
    public override int ReadTimeout { get; set; } = Timeout.Infinite;

    /// <inheritdoc/>
    public override int WriteTimeout { get; set; } = Timeout.Infinite;

    /// <inheritdoc/>
    public override long Position
    {
        get
        {
            throw new NotSupportedException(
                "Cannot set position on RingBufferedStream");
        }

        set
        {
            throw new NotSupportedException(
                "Cannot set position on RingBufferedStream");
        }
    }

    /// <summary>
    /// Gets the number of bytes currently buffered.
    /// </summary>
    public int BufferedByteCount => this.readAvailableByteCount;

    /// <inheritdoc/>
    public override void Flush()
    {
        // nothing to do
    }

    /// <summary>
    /// Set the length of the current stream. Always throws <see
    /// cref="NotSupportedException"/>.
    /// </summary>
    /// <param name="value">
    /// The desired length of the current stream in bytes.
    /// </param>
    public override void SetLength(long value)
    {
        throw new NotSupportedException(
            "Cannot set length on RingBufferedStream");
    }

    /// <summary>
    /// Sets the position in the current stream. Always throws <see
    /// cref="NotSupportedException"/>.
    /// </summary>
    /// <param name="offset">
    /// The byte offset to the <paramref name="origin"/> parameter.
    /// </param>
    /// <param name="origin">
    /// A value of type <see cref="SeekOrigin"/> indicating the reference
    /// point used to obtain the new position.
    /// </param>
    /// <returns>
    /// The new position within the current stream.
    /// </returns>
    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException("Cannot seek on RingBufferedStream");
    }

    /// <inheritdoc/>
    public override void Write(byte[] buffer, int offset, int count)
    {
        if (this.disposed)
        {
            throw new ObjectDisposedException("RingBufferedStream");
        }

        Monitor.Enter(this.store);
        bool haveLock = true;
        try
        {
            while (count > 0)
            {
                if (this.writeAvailableByteCount == 0)
                {
                    this.writeAvailable.Reset();
                    Monitor.Exit(this.store);
                    haveLock = false;
                    bool canceled;
                    if (!this.writeAvailable.Wait(
                        this.WriteTimeout,
                        this.cancellationTokenSource.Token,
                        out canceled) || canceled)
                    {
                        break;
                    }

                    Monitor.Enter(this.store);
                    haveLock = true;
                }
                else
                {
                    var toWrite = this.store.Length - this.writePos;
                    if (toWrite > this.writeAvailableByteCount)
                    {
                        toWrite = this.writeAvailableByteCount;
                    }

                    if (toWrite > count)
                    {
                        toWrite = count;
                    }

                    Array.Copy(
                        buffer,
                        offset,
                        this.store,
                        this.writePos,
                        toWrite);
                    offset += toWrite;
                    count -= toWrite;
                    this.writeAvailableByteCount -= toWrite;
                    this.readAvailableByteCount += toWrite;
                    this.writePos += toWrite;
                    if (this.writePos == this.store.Length)
                    {
                        this.writePos = 0;
                    }

                    this.readAvailable.Set();
                }
            }
        }
        finally
        {
            if (haveLock)
            {
                Monitor.Exit(this.store);
            }
        }
    }

    /// <inheritdoc/>
    public override void WriteByte(byte value)
    {
        if (this.disposed)
        {
            throw new ObjectDisposedException("RingBufferedStream");
        }

        Monitor.Enter(this.store);
        bool haveLock = true;
        try
        {
            while (true)
            {
                if (this.writeAvailableByteCount == 0)
                {
                    this.writeAvailable.Reset();
                    Monitor.Exit(this.store);
                    haveLock = false;
                    bool canceled;
                    if (!this.writeAvailable.Wait(
                        this.WriteTimeout,
                        this.cancellationTokenSource.Token,
                        out canceled) || canceled)
                    {
                        break;
                    }

                    Monitor.Enter(this.store);
                    haveLock = true;
                }
                else
                {
                    this.store[this.writePos] = value;
                    --this.writeAvailableByteCount;
                    ++this.readAvailableByteCount;
                    ++this.writePos;
                    if (this.writePos == this.store.Length)
                    {
                        this.writePos = 0;
                    }

                    this.readAvailable.Set();
                    break;
                }
            }
        }
        finally
        {
            if (haveLock)
            {
                Monitor.Exit(this.store);
            }
        }
    }

    /// <inheritdoc/>
    public override int Read(byte[] buffer, int offset, int count)
    {
        if (this.disposed)
        {
            throw new ObjectDisposedException("RingBufferedStream");
        }

        Monitor.Enter(this.store);
        int ret = 0;
        bool haveLock = true;
        try
        {
            while (count > 0)
            {
                if (this.readAvailableByteCount == 0)
                {
                    this.readAvailable.Reset();
                    Monitor.Exit(this.store);
                    haveLock = false;
                    bool canceled;
                    if (!this.readAvailable.Wait(
                        this.ReadTimeout,
                        this.cancellationTokenSource.Token,
                        out canceled) || canceled)
                    {
                        break;
                    }

                    Monitor.Enter(this.store);
                    haveLock = true;
                }
                else
                {
                    var toRead = this.store.Length - this.readPos;
                    if (toRead > this.readAvailableByteCount)
                    {
                        toRead = this.readAvailableByteCount;
                    }

                    if (toRead > count)
                    {
                        toRead = count;
                    }

                    Array.Copy(
                        this.store,
                        this.readPos,
                        buffer,
                        offset,
                        toRead);
                    offset += toRead;
                    count -= toRead;
                    this.readAvailableByteCount -= toRead;
                    this.writeAvailableByteCount += toRead;
                    ret += toRead;
                    this.readPos += toRead;
                    if (this.readPos == this.store.Length)
                    {
                        this.readPos = 0;
                    }

                    this.writeAvailable.Set();
                }
            }
        }
        finally
        {
            if (haveLock)
            {
                Monitor.Exit(this.store);
            }
        }

        return ret;
    }

    /// <inheritdoc/>
    public override int ReadByte()
    {
        if (this.disposed)
        {
            throw new ObjectDisposedException("RingBufferedStream");
        }

        Monitor.Enter(this.store);
        int ret = -1;
        bool haveLock = true;
        try
        {
            while (true)
            {
                if (this.readAvailableByteCount == 0)
                {
                    this.readAvailable.Reset();
                    Monitor.Exit(this.store);
                    haveLock = false;
                    bool canceled;
                    if (!this.readAvailable.Wait(
                        this.ReadTimeout,
                        this.cancellationTokenSource.Token,
                        out canceled) || canceled)
                    {
                        break;
                    }

                    Monitor.Enter(this.store);
                    haveLock = true;
                }
                else
                {
                    ret = this.store[this.readPos];
                    ++this.writeAvailableByteCount;
                    --this.readAvailableByteCount;
                    ++this.readPos;
                    if (this.readPos == this.store.Length)
                    {
                        this.readPos = 0;
                    }

                    this.writeAvailable.Set();
                    break;
                }
            }
        }
        finally
        {
            if (haveLock)
            {
                Monitor.Exit(this.store);
            }
        }

        return ret;
    }

    /// <inheritdoc/>
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            this.disposed = true;
            this.cancellationTokenSource.Cancel();
        }

        base.Dispose(disposing);
    }
}

该类使用我们的ManualResetEventAsync来帮助实现干净的关闭。

/// <summary>
///     Asynchronous version of <see cref="ManualResetEvent" />
/// </summary>
public sealed class ManualResetEventAsync
{
    /// <summary>
    /// The task completion source.
    /// </summary>
    private volatile TaskCompletionSource<bool> taskCompletionSource =
        new TaskCompletionSource<bool>();

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/>
    /// class with a <see cref="bool"/> value indicating whether to set the
    /// initial state to signaled.
    /// </summary>
    /// <param name="initialState">
    /// True to set the initial state to signaled; false to set the initial
    /// state to non-signaled.
    /// </param>
    public ManualResetEventAsync(bool initialState)
    {
        if (initialState)
        {
            this.Set();
        }
    }

    /// <summary>
    /// Return a task that can be consumed by <see cref="Task.Wait()"/>
    /// </summary>
    /// <returns>
    /// The asynchronous waiter.
    /// </returns>
    public Task GetWaitTask()
    {
        return this.taskCompletionSource.Task;
    }

    /// <summary>
    /// Mark the event as signaled.
    /// </summary>
    public void Set()
    {
        var tcs = this.taskCompletionSource;
        Task.Factory.StartNew(
            s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
            tcs,
            CancellationToken.None,
            TaskCreationOptions.PreferFairness,
            TaskScheduler.Default);
        tcs.Task.Wait();
    }

    /// <summary>
    /// Mark the event as not signaled.
    /// </summary>
    public void Reset()
    {
        while (true)
        {
            var tcs = this.taskCompletionSource;
            if (!tcs.Task.IsCompleted
#pragma warning disable 420
                || Interlocked.CompareExchange(
                    ref this.taskCompletionSource,
                    new TaskCompletionSource<bool>(),
                    tcs) == tcs)
#pragma warning restore 420
            {
                return;
            }
        }
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <exception cref="T:System.AggregateException">
    /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>
    /// was canceled -or- an exception was thrown during the execution
    /// of the <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
    /// </exception>
    public void Wait()
    {
        this.GetWaitTask().Wait();
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="cancellationToken">
    /// A <see cref="CancellationToken"/> to observe while waiting for
    /// the task to complete.
    /// </param>
    /// <exception cref="T:System.OperationCanceledException">
    /// The <paramref name="cancellationToken"/> was canceled.
    /// </exception>
    /// <exception cref="T:System.AggregateException">
    /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/> was
    /// canceled -or- an exception was thrown during the execution of the
    /// <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
    /// </exception>
    public void Wait(CancellationToken cancellationToken)
    {
        this.GetWaitTask().Wait(cancellationToken);
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="cancellationToken">
    /// A <see cref="CancellationToken"/> to observe while waiting for
    /// the task to complete.
    /// </param>
    /// <param name="canceled">
    /// Set to true if the wait was canceled via the <paramref
    /// name="cancellationToken"/>.
    /// </param>
    public void Wait(CancellationToken cancellationToken, out bool canceled)
    {
        try
        {
            this.GetWaitTask().Wait(cancellationToken);
            canceled = false;
        }
        catch (Exception ex)
            when (ex is OperationCanceledException
                || (ex is AggregateException
                    && ex.InnerOf<OperationCanceledException>() != null))
        {
            canceled = true;
        }
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="timeout">
    /// A <see cref="System.TimeSpan"/> that represents the number of
    /// milliseconds to wait, or a <see cref="System.TimeSpan"/> that
    /// represents -1 milliseconds to wait indefinitely.
    /// </param>
    /// <returns>
    /// true if the <see cref="ManualResetEventAsync"/> was signaled within
    /// the allotted time; otherwise, false.
    /// </returns>
    /// <exception cref="T:System.ArgumentOutOfRangeException">
    /// <paramref name="timeout"/> is a negative number other than -1
    /// milliseconds, which represents an infinite time-out -or-
    /// timeout is greater than <see cref="int.MaxValue"/>.
    /// </exception>
    public bool Wait(TimeSpan timeout)
    {
        return this.GetWaitTask().Wait(timeout);
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="millisecondsTimeout">
    /// The number of milliseconds to wait, or
    /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
    /// indefinitely.
    /// </param>
    /// <returns>
    /// true if the <see cref="ManualResetEventAsync"/> was signaled within
    /// the allotted time; otherwise, false.
    /// </returns>
    /// <exception cref="T:System.ArgumentOutOfRangeException">
    /// <paramref name="millisecondsTimeout"/> is a negative number other
    /// than -1, which represents an infinite time-out.
    /// </exception>
    public bool Wait(int millisecondsTimeout)
    {
        return this.GetWaitTask().Wait(millisecondsTimeout);
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="millisecondsTimeout">
    /// The number of milliseconds to wait, or
    /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
    /// indefinitely.
    /// </param>
    /// <param name="cancellationToken">
    /// A <see cref="CancellationToken"/> to observe while waiting for the
    /// <see cref="ManualResetEventAsync"/> to be signaled.
    /// </param>
    /// <returns>
    /// true if the <see cref="ManualResetEventAsync"/> was signaled within
    /// the allotted time; otherwise, false.
    /// </returns>
    /// <exception cref="T:System.AggregateException">
    /// The <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>
    /// was canceled -or- an exception was thrown during the execution of
    /// the <see cref="ManualResetEventAsync"/> waiting <see cref="Task"/>.
    /// </exception>
    /// <exception cref="T:System.ArgumentOutOfRangeException">
    /// <paramref name="millisecondsTimeout"/> is a negative number other
    /// than -1, which represents an infinite time-out.
    /// </exception>
    /// <exception cref="T:System.OperationCanceledException">
    /// The <paramref name="cancellationToken"/> was canceled.
    /// </exception>
    public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
    {
        return this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken);
    }

    /// <summary>
    /// Waits for the <see cref="ManualResetEventAsync"/> to be signaled.
    /// </summary>
    /// <param name="millisecondsTimeout">
    /// The number of milliseconds to wait, or
    /// <see cref="System.Threading.Timeout.Infinite"/> (-1) to wait
    /// indefinitely.
    /// </param>
    /// <param name="cancellationToken">
    /// A <see cref="CancellationToken"/> to observe while waiting for the
    /// <see cref="ManualResetEventAsync"/> to be signaled.
    /// </param>
    /// <param name="canceled">
    /// Set to true if the wait was canceled via the <paramref
    /// name="cancellationToken"/>.
    /// </param>
    /// <returns>
    /// true if the <see cref="ManualResetEventAsync"/> was signaled within
    /// the allotted time; otherwise, false.
    /// </returns>
    /// <exception cref="T:System.ArgumentOutOfRangeException">
    /// <paramref name="millisecondsTimeout"/> is a negative number other
    /// than -1, which represents an infinite time-out.
    /// </exception>
    public bool Wait(
        int millisecondsTimeout,
        CancellationToken cancellationToken,
        out bool canceled)
    {
        bool ret = false;
        try
        {
            ret = this.GetWaitTask().Wait(millisecondsTimeout, cancellationToken);
            canceled = false;
        }
        catch (Exception ex)
            when (ex is OperationCanceledException
                || (ex is AggregateException
                    && ex.InnerOf<OperationCanceledException>() != null))
        {
            canceled = true;
        }

        return ret;
    }
}

而且,ManualResetEventAsync 使用 InnerOf<T> 扩展......
/// <summary>
///     Extension functions.
/// </summary>
public static class Extensions
{
    /// <summary>
    /// Finds the first exception of the requested type.
    /// </summary>
    /// <typeparam name="T">
    /// The type of exception to return
    /// </typeparam>
    /// <param name="ex">
    /// The exception to look in.
    /// </param>
    /// <returns>
    /// The exception or the first inner exception that matches the
    /// given type; null if not found.
    /// </returns>
    public static T InnerOf<T>(this Exception ex)
        where T : Exception
    {
        return (T)InnerOf(ex, typeof(T));
    }

    /// <summary>
    /// Finds the first exception of the requested type.
    /// </summary>
    /// <param name="ex">
    /// The exception to look in.
    /// </param>
    /// <param name="t">
    /// The type of exception to return
    /// </param>
    /// <returns>
    /// The exception or the first inner exception that matches the
    /// given type; null if not found.
    /// </returns>
    public static Exception InnerOf(this Exception ex, Type t)
    {
        if (ex == null || t.IsInstanceOfType(ex))
        {
            return ex;
        }

        var ae = ex as AggregateException;
        if (ae != null)
        {
            foreach (var e in ae.InnerExceptions)
            {
                var ret = InnerOf(e, t);
                if (ret != null)
                {
                    return ret;
                }
            }
        }

        return InnerOf(ex.InnerException, t);
    }
}

如果您在创建TaskCompletionSource时使用了TaskCreationOptions.RunContinuationsAsynchronously | TaskCreationOptions.DenyChildAttach,我相信在设置结果时可以省略Task.Factory.StartNew - Scott Chamberlain
@ScottChamberlain 对我来说,指定 DenyChildAttach 会导致 ArgumentOutOfRangeException - tm1

3

我要在这里提前说明,这是一个非常简约的实现,我没有时间真正测试其性能特征。可能只有足够的内容来进行一些性能测试。当我看到你的问题时,想到的是创建一个自定义流,使用BlockingCollection作为存储介质。

基本上,这将给你一个可以从不同线程读/写的流,并在消费方滞后时限制生产者。我再次强调,这不是一个健壮的实现,只是一个快速的概念验证,需要放置更多的错误检查、参数验证以及处理流Close的良好方案。目前,如果关闭流时底层BlockingCollection中仍有数据,您将无法再读取数据。如果我明天有时间,我会稍微完善一下,但也许您可以先给一些反馈。

更新: Yurik已经提供了这个解决方案的实现作为维基百科,增强应该指向那个答案。

public class BlockingStream : Stream { private BlockingCollection _data;
private CancellationTokenSource _cts = new CancellationTokenSource(); private int _readTimeout = -1; private int _writeTimeout = -1;

    public BlockingStream(int maxBytes)
    {
      _data = new BlockingCollection<byte>(maxBytes);      
    }

    public override int ReadTimeout
    {
      get
      {
        return _readTimeout;
      }
      set
      {
        _readTimeout = value;
      }
    }

    public override int WriteTimeout
    {
      get
      {
        return _writeTimeout;
      }
      set
      {
        _writeTimeout = value;
      }
    }

    public override bool CanTimeout
    {
      get
      {
        return true;
      }
    }

    public override bool CanRead
    {
      get { return true; }
    }

    public override bool CanSeek
    {
      get { return false; }
    }

    public override bool CanWrite
    {
      get { return true; }
    }

    public override void Flush()
    {
      return;
    }

    public override long Length
    {
      get { throw new NotImplementedException(); }
    }

    public override long Position
    {
      get
      {
        throw new NotImplementedException();
      }
      set
      {
        throw new NotImplementedException();
      }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
      throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
      throw new NotImplementedException();
    }

    public override int ReadByte()
    {
      int returnValue = -1;
      try
      {
        byte b;
        if (_data.TryTake(out b, ReadTimeout, _cts.Token))
        {
          returnValue = (int)b;
        }
      }
      catch (OperationCanceledException)
      {
      }
      return returnValue;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
      int bytesRead = 0;
      byte b;
      try
      {
        while (bytesRead < count && _data.TryTake(out b, ReadTimeout, _cts.Token))
        {
          buffer[offset + bytesRead] = b;
          bytesRead++;
        }
      }
      catch (OperationCanceledException)
      {
        bytesRead = 0;
      }
      return bytesRead;
    }

    public override void WriteByte(byte value)
    {
      try
      {
        _data.TryAdd(value, WriteTimeout, _cts.Token);  
      }
      catch (OperationCanceledException)
      {
      }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
      try
      {
        for (int i = offset; i < offset + count; ++i)
        {
          _data.TryAdd(buffer[i], WriteTimeout, _cts.Token);
        }
      }
      catch (OperationCanceledException)
      {
      }
    }

    public override void Close()
    {
      _cts.Cancel();
      base.Close();
    }

    protected override void Dispose(bool disposing)
    {
      base.Dispose(disposing);
      if (disposing)
      {
        _data.Dispose();
      }
    }
  }

当您构建流时,需要传递一个缓冲区大小的参数,以指定在阻塞写入器之前应该缓冲多少字节。以下是对该功能的简单测试,这是唯一进行的测试...

  class Program
  {
    static BlockingStream _dataStream = new BlockingStream(10);
    static Random _rnd = new Random();
    [STAThread]
    static void Main(string[] args)
    {
      Task producer = new Task(() =>
        {
          Thread.Sleep(1000);
          for (int i = 0; i < 100; ++i)
          {
            _dataStream.WriteByte((byte)_rnd.Next(0, 255));            
          }          
        });

      Task consumer = new Task(() =>
        {
          int i = 0;
          while (true)
          {
            Console.WriteLine("{0} \t-\t {1}",_dataStream.ReadByte(), i++);
            // Slow the consumer down.
            Thread.Sleep(500);
          }
        });

      producer.Start();
      consumer.Start();

      Console.ReadKey();
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接