FIFO/Queue缓冲器专为字节流而设计

16
有没有.NET数据结构/类的组合,允许将字节数据附加到缓冲区的末尾,但所有的“Peek”和“Read”都从开头进行,读取时缩短缓冲区? MemoryStream类似乎完成了部分工作,但我需要维护不同的读写位置,并且在读取后不会自动丢弃开始的数据。
在回答此问题之后发布了一个答案,基本上就是我正在尝试做的事情,但我希望能够在同一进程的不同组件上执行异步I/O,就像普通管道或网络流一样(我需要先过滤/处理数据)。

1
在读取缓冲区时来回跳跃有什么问题吗? - Ry-
只需要记住我所说的话,而不是像NetworkStream那样一遍又一遍地读取。 - Deanna
你需要读写不同大小的数组吗?一个byte[]队列对你来说足够好了吗? - svick
读取将是单个字节,直到我发现某些数据,然后是各种长度的块。写入将根据我接收到的数据而有任意长度。 - Deanna
5个回答

11

我将发布一份我曾经在工作项目中编写的逻辑的简化版本。这个版本的优点是它能够使用带缓冲数据的链表,因此当读取时,你不必缓存大量内存和/或复制内存。此外,它是线程安全的,并且表现得像一个网络流,也就是说:当没有数据可用时等待直到有数据可用或超时。此外,当读取x个字节而只有y个字节时,在读取所有字节后返回。希望这可以帮到你!

    public class SlidingStream : Stream
{
    #region Other stream member implementations

    ...

    #endregion Other stream member implementations

    public SlidingStream()
    {
        ReadTimeout = -1;
    }

    private readonly object _writeSyncRoot = new object();
    private readonly object _readSyncRoot = new object();
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();

    public int ReadTimeout { get; set; }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (_dataAvailableResetEvent.Wait(ReadTimeout))
            throw new TimeoutException("No data available");

        lock (_readSyncRoot)
        {
            int currentCount = 0;
            int currentOffset = 0;

            while (currentCount != count)
            {
                ArraySegment<byte> segment = _pendingSegments.First.Value;
                _pendingSegments.RemoveFirst();

                int index = segment.Offset;
                for (; index < segment.Count; index++)
                {
                    if (currentOffset < offset)
                    {
                        currentOffset++;
                    }
                    else
                    {
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }
                }

                if (currentCount == count)
                {
                    if (index < segment.Offset + segment.Count)
                    {
                        _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
                    }
                }

                if (_pendingSegments.Count == 0)
                {
                    _dataAvailableResetEvent.Reset();

                    return currentCount;
                }
            }

            return currentCount;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeSyncRoot)
        {
            byte[] copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.AddLast(new ArraySegment<byte>(copy));

            _dataAvailableResetEvent.Set();
        }   
    }
}

1
看起来不错,而且这正是我要走的路。我今晚会试一下。 - Deanna
如果尝试读取数据时没有可用数据,我觉得这会导致崩溃。 - svick
@svick - 完全正确,这只是一个草稿,没有任何参数验证等。 manualResetEvent 就是为此而存在的,我只是忘记在 read 方法开始时等待它了。现在已经修复了。感谢您的提醒。 - Polity
这看起来非常不错。我本来也打算做类似的事情,但是我先在谷歌上搜索了一下。 @Deanna 你把你的衍生版本发布到哪里了吗?你能发一下吗? 很想看看你对它做了什么。 异常处理等 - Jonas
我也找到了这个。虽然链接以“java”开头,但它是C#。 http://www.java2s.com/Code/CSharp/File-Stream/FifoStream.htm - Jonas
显示剩余4条评论

1
代码可以比接受的答案更简单。没有必要使用for循环。
/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}

这基本上与链接的问题相同,无法满足异步或阻塞的需求。不过还是谢谢。 - Deanna
1
好的,但那段代码不够优雅,也不是线程安全的。你可以用6行代码来实现,而不需要16行。 - Elmue

0

这里是一个希望没有竞态条件的版本,使用SemaphoreSlim进行通知:

public class SlidingStream : Stream
{
    private readonly object _writeLock = new object();
    private readonly object _readLock = new object();
    private readonly ConcurrentQueue<byte[]> _pendingSegments = new ConcurrentQueue<byte[]>();
    private byte[] _extraSegment = null;

    private readonly SemaphoreSlim _smSegmentsAvailable = new SemaphoreSlim(0);

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeLock)
        {
            var copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.Enqueue(copy);
            _smSegmentsAvailable.Release(1);
        }
    }

    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
    {
        Write(buffer, offset, count);
        return Task.CompletedTask;
    }

    public override int Read(byte[] buffer, int offset, int bytesToRead)
    {
        lock (_readLock)
        {
            var bytesRead = 0;

            while (bytesToRead > 0)
            {
                byte[] segment;

                if (_extraSegment != null)
                {
                    segment = _extraSegment;
                    _extraSegment = null;
                }
                else
                {
                    if (_smSegmentsAvailable.CurrentCount == 0 && bytesRead > 0)
                    {
                        return bytesRead;
                    }

                    _smSegmentsAvailable.Wait(_cancel);

                    if (!_pendingSegments.TryDequeue(out segment))
                    {
                        throw new InvalidOperationException("No segment found, despite semaphore");
                    }
                }

                var copyCount = Math.Min(bytesToRead, segment.Length);
                Array.Copy(segment, 0, buffer, offset + bytesRead, copyCount);
                bytesToRead -= copyCount;
                bytesRead += copyCount;

                var extraCount = segment.Length - copyCount;
                if (extraCount > 0)
                {
                    _extraSegment = new byte[extraCount];
                    Array.Copy(segment, copyCount, _extraSegment, 0, extraCount);
                }
            }

            return bytesRead;
        }
    }

    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        //could be extended here with a proper async read
        var result = Read(buffer, offset, count);
        return Task.FromResult(result);
    }

    protected override void Dispose(bool disposing)
    {
        _smSegmentsAvailable.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => true;

    public override long Seek(long offset, SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override void Flush() {}

    public override long Length => throw new NotSupportedException();

    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }
}

0

我试着对Polity的代码进行优化。虽然离最佳状态还有一段距离,但也许能够正常运行。

public class SlidingStream : Stream {
  public SlidingStream() {
    ReadTimeout = -1;
  }

  private readonly object ReadSync = new object();
  private readonly object WriteSync = new object();
  private readonly ConcurrentQueue<ArraySegment<byte>> PendingSegments
    = new ConcurrentQueue<ArraySegment<byte>>();
  private readonly ManualResetEventSlim DataAvailable = new ManualResetEventSlim(false);
  private ArraySegment<byte>? PartialSegment;

  public new int ReadTimeout;

  public override bool CanRead => true;

  public override bool CanSeek => false;

  public override bool CanWrite => true;

  public override long Length => throw new NotImplementedException();

  public override long Position {
    get => throw new NotImplementedException();
    set => throw new NotImplementedException();
  }

  private bool Closed;

  public override void Close() {
    Closed = true;
    DataAvailable.Set();
    base.Close();
  }

  public override int Read(byte[] buffer, int offset, int count) {
    int msStart = Environment.TickCount;

    lock (ReadSync) {
      int read = 0;

      while (read < count) {
        ArraySegment<byte>? seg = TryDequeue(msStart);
        if (seg == null) {
          return read;
        }

        ArraySegment<byte> segment = seg.GetValueOrDefault();
        int bite = Math.Min(count - read, segment.Count);
        if (bite < segment.Count) {
          PartialSegment = new ArraySegment<byte>(
            segment.Array,
            segment.Offset + bite,
            segment.Count - bite
          );
        }

        Array.Copy(segment.Array, segment.Offset, buffer, offset + read, bite);
        read += bite;
      }

      return read;
    }
  }

  private ArraySegment<byte>? TryDequeue(int msStart) {
    ArraySegment<byte>? ps = PartialSegment;
    if (ps.HasValue) {
      PartialSegment = null;
      return ps;
    }

    DataAvailable.Reset();

    ArraySegment<byte> segment;
    while (!PendingSegments.TryDequeue(out segment)) {
      if (Closed) {
        return null;
      }
      WaitDataOrTimeout(msStart);
    }

    return segment;
  }

  private void WaitDataOrTimeout(int msStart) {
    int timeout;
    if (ReadTimeout == -1) {
      timeout = -1;
    }
    else {
      timeout = msStart + ReadTimeout - Environment.TickCount;
    }

    if (!DataAvailable.Wait(timeout)) {
      throw new TimeoutException("No data available");
    }
  }

  public override void Write(byte[] buffer, int offset, int count) {
    lock (WriteSync) {
      byte[] copy = new byte[count];
      Array.Copy(buffer, offset, copy, 0, count);

      PendingSegments.Enqueue(new ArraySegment<byte>(copy));

      DataAvailable.Set();
    }
  }

  public override void Flush() {
    throw new NotImplementedException();
  }

  public override long Seek(long offset, SeekOrigin origin) {
    throw new NotImplementedException();
  }

  public override void SetLength(long value) {
    throw new NotImplementedException();
  }
}

0
一个晚回答,但自 .NET Framework 2.0 版本以来就支持 队列。使用 ConcurrentQueue 进行线程安全操作。
我创建了下面的 Stream 实现,它将在字节可用时执行 ReadReadLines。这不是最好的实现,但应该能胜任工作。
public class QueueStream : Stream
{
    protected readonly ConcurrentQueue<byte> Queue = new ConcurrentQueue<byte>();

    public Task? DownloadTask { get; set; }

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => true;

    public override long Length => Queue.Count;

    public override long Position
    {
        get => 0;
        set => throw new NotImplementedException($"{nameof(QueueStream)} is not seekable");
    }

    public override void Flush()
    {
        Queue.Clear();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
        {
            throw new ArgumentNullException(nameof(buffer));
        }

        if (buffer.Length < count)
        {
            throw new Exception($"{nameof(buffer)} length is less that the specified {nameof(count)}");
        }

        var index = 0;
        var insertedCount = 0;
        while (Queue.TryDequeue(out var b) && insertedCount < count)
        {
            if (index >= offset)
            {
                buffer[insertedCount++] = b;
            }

            index++;
        }

        return insertedCount;
    }

    public string ReadLines(int numberOfLines = 1)
    {
        var currentLine = 0;
        var stringBuilder = new StringBuilder();

        Func<bool> isFaulted = () => false;
        Func<bool> isCompleted = () => true;

        if (DownloadTask != null)
        {
            isFaulted = () => DownloadTask.IsFaulted;
            isCompleted = () => DownloadTask.IsCompleted;
        }

        while (!isFaulted() && !isCompleted() && currentLine < numberOfLines)
        {
            if (!Queue.TryDequeue(out var byteResult))
            {
                continue;
            }

            if (byteResult == '\r' || byteResult == '\n')
            {
                if (byteResult == '\r')
                {
                    byte peekResult = 0;
                    while (!isFaulted() && !isCompleted() && !Queue.TryPeek(out peekResult))
                    {
                    }

                    if (peekResult == '\n')
                    {
                        Queue.TryDequeue(out _);
                    }
                }

                stringBuilder.Append(Environment.NewLine);
                currentLine++;
                continue;
            }

            stringBuilder.Append((char)byteResult);
        }

        return stringBuilder.ToString();
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        var forEnd = offset + count;
        for (var index = offset; index < forEnd; index++)
        {
            Queue.Enqueue(buffer[index]);
        }
    }
}

请注意,.NET Framework 中根本没有 ConcurrentQueue<>.Clear() 方法。 - AntonK
在“Read”函数中存在一个bug,可能会在读取时跳过一些字节:循环条件应该按照以下方式重新排序- while (insertedCount < count && Queue.TryDequeue(out var b)) ... - AntonK

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接