流按照模式分割的最快方法

5

如何以最优/最快的方式将Steam拆分为由字节模式(例如new byte[] { 0, 0 })分隔的块?

我当前的、朴素且缓慢的实现是逐字节读取流,每次遇到分隔符就递减一个计数器。如果计数器为零,则生成一个内存块。

const int NUMBER_CONSECUTIVE_DELIMITER = 2;
const int DELIMITER = 0;

public IEnumerable<ReadOnlyMemory<byte>> Chunk(Stream stream)
{
    var chunk = new MemoryStream();

    try
    {
        int b; //the byte being read
        int c = NUMBER_CONSECUTIVE_DELIMITER;

        while ((b = stream.ReadByte()) != -1) //Read the stream byte by byte, -1 = end of the stream
        {
            chunk.WriteByte((byte)b); //Write this byte to the next chunk

            if (b == DELIMITER) 
                c--; //if we hit the delimiter (ie '0') decrement the counter
            else
                c = NUMBER_CONSECUTIVE_DELIMITER; //else, reset the couter

            if ((c <= 0 || stream.Position == stream.Length) //we hit two subsequent '0's
            {
                var r = chunk.ToArray().AsMemory(); //parse it to a Memory<T>

                chunk.Dispose();
                chunk = new();

                yield return r;
            }
        }
    }
    finally
    {
        chunk.Dispose();
    }
}

1
你可以引入一个缓冲区并调用Read/Write而不是ReadByte/WriteByte。在缓冲区中查找分隔符并正确处理连续分隔符跨缓冲区的情况,以及在同一缓冲区中存在多个分隔符的情况。 - Dialecticus
2个回答

5
这样的实现非常难以实现,因为流必须在固定的缓冲区大小中被读出。缓冲区可能对于内容的解释来说太大或太小了。为了解决这个问题,添加了ReadOnlySequence<T>结构体。关于此主题的更多信息可以在此处查看。
通过使用System.IO.Pipelines(必须获取包),可以按如下方式解决这个问题:
public static async Task FillPipeAsync(Stream stream, PipeWriter writer, CancellationToken cancellationToken = default)
{
    // The minimum buffer size that is used for the current buffer segment.
    const int bufferSize = 65536;

    while (true)
    {
        // Request 65536 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(bufferSize);

        // Read the content from the stream.
        int bytesRead = await stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false);
        if (bytesRead == 0) break;

        // Tell the writer how many bytes are read.
        writer.Advance(bytesRead);

        // Flush the data to the PipeWriter.
        FlushResult result = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
        if (result.IsCompleted) break;
    }

    // This enables our reading process to be notified that no more new data is coming.
    await writer.CompleteAsync().ConfigureAwait(false);
}

这将异步读取您的流并将缓冲段写入管道。 接下来,您需要实现读取逻辑,将连接的缓冲段切片/合并为块:
public static async IAsyncEnumerable<ReadOnlySequence<byte>> ReadPipeAsync(PipeReader reader, ReadOnlyMemory<byte> delimiter,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (true)
    {
        // Read from the PipeReader.
        ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadChunk(ref buffer, delimiter.Span, out ReadOnlySequence<byte> chunk))
            yield return chunk;

        // Tell the PipeReader how many bytes are read.
        // This is essential because the Pipe will release last used buffer segments that are not longer in use.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Take care of the complete notification and return the last buffer. UPDATE: Corrected issue 2/.
        if (result.IsCompleted)
        {
            yield return buffer;
            break;
        }
    }

    await reader.CompleteAsync().ConfigureAwait(false);
}

private static bool TryReadChunk(ref ReadOnlySequence<byte> buffer, ReadOnlySpan<byte> delimiter,
    out ReadOnlySequence<byte> chunk)
{
    // Search the buffer for the first byte of the delimiter.
    SequencePosition? position = buffer.PositionOf(delimiter[0]);

    // If no occurence was found or the next bytes of the data in the buffer does not match the delimiter, return false.
    // UPDATE: Corrected issue 3/.
    if (position is null || !buffer.Slice(position.Value, delimiter.Length).FirstSpan.StartsWith(delimiter))
    {
        chunk = default;
        return false;
    }

    // Return the calculated chunk and update the buffer to cut the start.
    chunk = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(delimiter.Length, position.Value));
    return true;
}

为了使其以这种形式工作,您必须使用 IAsyncEnumerable,以便可以将块流式传输到 foreach 循环中。合并和切片大部分由管道处理,因此可以用相对较少的代码在此处构建可靠的算法。该代码还将以高性能方式处理此操作。 用法:
// Create a Pipe that manages the buffer.
Pipe pipe = new Pipe();
ConfiguredTaskAwaitable writing = FillPipeAsync(stream, pipe.Writer).ConfigureAwait(false);

// The delimiter that should be used. This can be any data with length > 0.
ReadOnlyMemory<byte> delimiter = new ReadOnlyMemory<byte>(new byte[] { 0, 0 });

// 'await foreach' and 'await writing' are executed asynchronously (in parallel).
await foreach (ReadOnlySequence<byte> chunk in ReadPipeAsync(pipe.Reader, delimiter))
{
    // Use "chunk" to retrieve your chunked content.
};

await writing;

请注意,读取和分块是异步且独立完成的。

1
Philipp,这个答案太棒了 - 我正在处理一些边缘情况,并很快会分享更新的代码片段。 - Wouter Van Ranst
1
Philipp,你的回答中有一些我无法满意地解决的问题,这可能是我隐含的要求:1/在65k字节处有一个硬断点(reader.Buffer),这是不可取的,2/“last”块丢失了(即最后一个“00”之后的部分),3/它还意外地切片了“0”(但那只是一个小错误需要纠正)。 - Wouter Van Ranst
1
@WouterVanRanst 我已经解决了提到的问题,并包含了一些文档以使其更清晰。回答1/:硬换行可能是底层流的产物。例如,NetworkStream的Socket限制为64k的缓冲区大小。我尝试使用内存流进行操作,但我找不到任何缓冲区限制。 - Philipp Ape
谢谢Philipp,我会去看一下。我正在使用FileStream。我确实尝试了更大的缓冲区大小,但不能保证Stream会比缓冲区小。 - Wouter Van Ranst

2

最终,我用以下代码来实现,这个代码深受 Philipp 上面回答和 https://keestalkstech.com/2010/11/seek-position-of-a-string-in-a-file-or-filestream/ 的启发。

public override IEnumerable<byte[]> Chunk(Stream stream)
{
    var buffer = new byte[bufferSize];
    var size = bufferSize;
    var offset = 0;
    var position = stream.Position;
    var nextChunk = Array.Empty<byte>();

    while (true)
    {
        var bytesRead = stream.Read(buffer, offset, size);

        // when no bytes are read -- the string could not be found
        if (bytesRead <= 0)
            break;

        // when less then size bytes are read, we need to slice the buffer to prevent reading of "previous" bytes
        ReadOnlySpan<byte> ro = buffer;
        if (bytesRead < size)
            ro = ro.Slice(0, offset + bytesRead);

        // check if we can find our search bytes in the buffer
        var i = ro.IndexOf(Delimiter);
        if (i > -1 &&  // we found something
            i <= bytesRead &&  //i <= r  -- we found something in the area that was read (at the end of the buffer, the last values are not overwritten). i = r if the delimiter is at the end of the buffer
            nextChunk.Length + (i + Delimiter.Length - offset) >= MinChunkSize)  //the size of the chunk that will be made is large enough
        {
            var chunk = buffer[offset..(i + Delimiter.Length)];
            yield return new byte[](Concat(nextChunk, chunk));

            nextChunk = Array.Empty<byte>();

            offset = 0;
            size = bufferSize;
            position += i + Delimiter.Length;
            stream.Position = position;
            continue;
        }
        else if (stream.Position == stream.Length)
        {
            // we re at the end of the stream
            var chunk = buffer[offset..(bytesRead + offset)]; //return the bytes read
            yield return new byte[](Concat(nextChunk, chunk));

            break;
        }

        // the stream is not finished. Copy the last 2 bytes to the beginning of the buffer and set the offset to fill the buffer as of byte 3
        nextChunk = Concat(nextChunk, buffer[offset..buffer.Length]);

        offset = Delimiter.Length;
        size = bufferSize - offset;
        Array.Copy(buffer, buffer.Length - offset, buffer, 0, offset);
        position += bufferSize - offset;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接