TransformBlock 发布到输出

6
我的情景是我有一个BufferBlock<Stream>,从外部源接收Stream,比如文件系统或一些FTP服务器。这些文件Stream将传递到另一个块并进行处理。
唯一的问题是其中一些文件被压缩了,我想在中间添加一个Block,以便在必要时解压文件,并为每个条目创建多个输出Stream
然而,我不想使用TransformBlockMany,因为这意味着我必须完全接收ZIP Stream,并立即创建输出Stream数组。
我希望这个Block接收ZIP Stream,开始解压缩,并在准备好一个Entry时Push到下一个流,这样处理块就可以在第一个文件解压缩后立即开始处理,而不必等待所有内容都解压缩完毕。
我该怎么做呢?

3
你使用哪个库进行ZIP解压缩? - Tigran Topchyan
2
使用 System.IO.Compression.ZipFile。 - Gideon
到目前为止,我理解我的问题实际上是异步部分。如果我不使用异步,我可能只需在TransformManyBlock中使用yield return。但是我无法将yield return与async一起使用。 - Gideon
你能分享一些代码吗? - Tigran Topchyan
相关:是否可能让任何数据流块类型在单个输入的结果中发送多个中间结果?。顺便提一下,如果您需要限制内存使用,请注意内置TransformManyBlock的输出队列是无界的。这里有一个相关主题。 - Theodor Zoulias
2个回答

1

我明白我的问题是不能同时使用yield/async。但是在重构后,我摆脱了这种需要,并提出了以下(简化)版本:

var block = new TransformManyBlock<Stream, Stream>((input) => {
var archive = new System.IO.Compression.ZipArchive(input, System.IO.Compression.ZipArchiveMode.Read, true);
foreach (ZipArchiveEntry entry in archive.Entries)
{
    if (string.IsNullOrWhiteSpace(entry.Name)) //is a folder
        continue;

    yield return entry.Open();

}

});

0
您可以使用谓词链接设置中间块,以便检查流是否为存档文件,从而为您的解压逻辑做好准备,例如:
var buffer = new BufferBlock<Stream>();
var unzipper = new TransformManyBlock<Stream, Stream>(input => { /* unzip here */ });
var processBlock = new ActionBlock<Stream>(input => { /* process streams here */ });

buffer.LinkTo(unzipper, input => /* check is stream a zip archive */);
unzipper.LinkTo(processBlock);
buffer.LinkTo(processBlock);

关于与 yield 一起使用的async,您可以尝试使用 GitHubNuGet 上可用的 AsyncEnumerable 包。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接