我正在尝试使用 TPL Dataflow
实现数据处理管道。然而,我对数据流相对较新,不完全确定如何正确地使用它来解决我的问题。
问题:
我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件的大小约为 700MB
到 1GB
。每个文件包含 JSON
数据。为了并行处理这些文件并且不会用尽内存,我正在尝试使用带有 yield return
的 IEnumerable<>
,然后进一步处理数据。
一旦我获得文件列表,我想要同时并行处理最多 4-5 个文件。我的困惑来自于:
- 如何在
async/await
和数据流中使用IEnumerable<>
和yeild return
。遇到了svick提出的这个答案,但仍不确定如何将IEnumerable<>
转换为ISourceBlock
,然后将所有块连接在一起并跟踪完成情况。 - 在我的情况下,
producer
将非常快(遍历文件列表),但是consumer
将非常慢(处理每个文件-读取数据,反序列化JSON)。在这种情况下,如何跟踪完成情况。 - 我应该使用数据块的
LinkTo
特性连接各种块吗?还是使用诸如OutputAvailableAsync()
和ReceiveAsync()
的方法将数据从一个块传播到另一个块。
代码:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
在上面的代码中,我没有使用 IEnumerable<DataType>
和 yield return
,因为它不能与 async/await
一起使用。所以我将输入缓冲区连接到 ActionBlock<DataType>
,然后再将其发布到另一个队列。但是,通过使用 ActionBlock<>
,我无法将其链接到下一个用于处理的块,并且必须手动从 ActionBlock<>
发布/发送异步消息到 BufferBlock<>
。此外,在这种情况下,不确定如何跟踪完成状态。这段代码可以工作,但是我相信肯定有更好的解决方案,我可以将所有的块链接在一起(而不是使用
ActionBlock<DataType>
,然后从中发送消息到 BufferBlock<DataType>
)。另一种选择是使用
Rx
将 IEnumerable<>
转换为 IObservable<>
,但我对 Rx
不太熟悉,不知道如何正确地混合使用 TPL Dataflow
和 Rx
。