使用TPL Dataflow与async/await和yield return

17

我正在尝试使用 TPL Dataflow 实现数据处理管道。然而,我对数据流相对较新,不完全确定如何正确地使用它来解决我的问题。

问题

我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件的大小约为 700MB1GB。每个文件包含 JSON 数据。为了并行处理这些文件并且不会用尽内存,我正在尝试使用带有 yield returnIEnumerable<>,然后进一步处理数据。

一旦我获得文件列表,我想要同时并行处理最多 4-5 个文件。我的困惑来自于:

  • 如何在 async/await 和数据流中使用 IEnumerable<>yeild return。遇到了svick提出的这个答案,但仍不确定如何将 IEnumerable<> 转换为 ISourceBlock,然后将所有块连接在一起并跟踪完成情况。
  • 在我的情况下,producer将非常快(遍历文件列表),但是consumer将非常慢(处理每个文件-读取数据,反序列化JSON)。在这种情况下,如何跟踪完成情况。
  • 我应该使用数据块的 LinkTo 特性连接各种块吗?还是使用诸如 OutputAvailableAsync()ReceiveAsync() 的方法将数据从一个块传播到另一个块。

代码

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
在上面的代码中,我没有使用 IEnumerable<DataType>yield return,因为它不能与 async/await 一起使用。所以我将输入缓冲区连接到 ActionBlock<DataType>,然后再将其发布到另一个队列。但是,通过使用 ActionBlock<>,我无法将其链接到下一个用于处理的块,并且必须手动从 ActionBlock<> 发布/发送异步消息到 BufferBlock<>。此外,在这种情况下,不确定如何跟踪完成状态。
这段代码可以工作,但是我相信肯定有更好的解决方案,我可以将所有的块链接在一起(而不是使用 ActionBlock<DataType>,然后从中发送消息到 BufferBlock<DataType>)。
另一种选择是使用 RxIEnumerable<> 转换为 IObservable<>,但我对 Rx 不太熟悉,不知道如何正确地混合使用 TPL DataflowRx

你的处理过程受限于CPU。因此,异步IO是毫无意义的。它不会为你节省一毫秒的处理时间。删除所有异步操作,问题就变得简单了。 - usr
2
@usr:我还没有仔细研究过这个具体的场景;问题陈述太过宽泛,没有提供一个好的 [mcve],使人们能够完全理解上下文。可能在这里异步操作并不有用。然而,我认为认为仅仅因为处理是 CPU 绑定的,异步 I/O 就是“无意义”的是一种谬误。异步操作提供了独立于可能的性能优势的架构优势,缺乏后者并不排除前者的可能性。 - Peter Duniho
@PeterDuniho 有哪些架构上的好处?你总是可以使用线程模拟任何形式的并发或并行。异步IO的唯一优点是无需线程(在异步IO加await的情况下,在GUI场景中非常出色)。但是,代码质量的损害是显著的。 - usr
1
我不打算关闭这个问题。我认为这个问题有一个很好的核心。由于这是新颖的材料,而不是每天100个机械的异步问题(“哦,我的应用程序因为我调用了Result或Wait而被锁定!”),所以我会给予怀疑的态度。@PeterDuniho - usr
2
“异步IO的唯一优点是无需线程” - 我猜我们只能同意不同意。首先,异步IO甚至并非“无需线程”; 它只是使用IOCP线程池而不需要额外显式创建的线程。其次,在C#中,“async”习惯用语为实现几乎非异步形式的异步代码提供了一种非常好的、清晰的方式,这对于任何性能提升都是有用的。个人见解可能会有所不同。 - Peter Duniho
显示剩余16条评论
3个回答

10

问题 1

您可以通过在消费者块上直接使用PostSendAsyncIEnumerable<T> 生产者插入到您的TPL Dataflow链中,如下所示:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

您也可以使用 BufferBlock<TInput>,但在您的情况下,这似乎是不必要的(甚至有害 - 请参见下一部分)。

问题2

如果您的生产者运行速度比URI的处理速度快(并且您已经指出了这一点),并且选择为您的 _processingBlock 设置了 BoundedCapacity,那么当块的内部缓冲区达到指定容量时,您的 SendAsync 将会“挂起”,直到缓冲槽释放,并且您的 foreach 循环将被限制。这种反馈机制创建了背压,并确保您不会耗尽内存。

问题3

在大多数情况下,您应该绝对使用 LinkTo 方法来链接您的块。不幸的是,由于 IDisposable 和非常大的(潜在的)序列之间的相互作用,您的情况是一个特殊情况。因此,您的完成将在缓冲和处理块之间自动流动(由于 LinkTo),但在此之后 - 您需要手动传播它。这很棘手,但可行。

我将用一个“Hello World”示例来说明这一点,其中生产者会遍历每个字符,而消费者(非常慢)会将每个字符输出到 Debug 窗口。

注意:LinkTo 不存在。

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);
这将输出:
Yielded H
H
Yielded e
e
Yielded l
l
Yielded l
l
Yielded o
o
Yielded
Yielded w w Yielded o o Yielded r r Yielded l l Yielded d d
从上面的输出可以看出,生产者被限制,块之间的交接缓冲区永远不会太大。
编辑: 您可能会发现通过传播完成更清晰。
producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

...在producer定义之后立即调用此方法。这可以使您略微减少生产者/消费者的耦合 - 但最终仍然需要记住观察Task.WhenAll(producer.Completion, consumer.Completion)


8
为了并行处理这些文件,避免内存不足,我正在尝试使用带有yield return的IEnumerable<>,然后进一步处理数据。但我认为这一步是不必要的。你实际上只是避免了一个文件名列表,即使你有数百万个文件,文件名列表也不会占用大量内存。
我将输入缓冲区链接到ActionBlock,然后再将其发布到另一个队列。然而,通过使用ActionBlock<>,我无法将其链接到下一个块进行处理,并且必须从ActionBlock<>手动Post/SendAsync到BufferBlock<>。此外,在这种情况下,不确定如何跟踪完成状态。
ActionBlock是“终端”块。它只接受输入,不产生任何输出。在您的情况下,您不想要ActionBlock;您需要TransformManyBlock,它接受输入,运行函数并产生输出(每个输入项可以有任意数量的输出项)。
另一个要记住的点是,所有缓冲块都有一个输入缓冲区。因此,额外的BufferBlock是不必要的。
最后,如果您已经处于“数据流领域”,通常最好以实际执行某些操作的数据流块结束(例如,ActionBlock而不是BufferBlock)。在这种情况下,您可以使用BufferBlock作为有界的生产者/消费者队列,其中某些其他代码正在消耗结果。个人而言,我认为将消费代码重写为ActionBlock的操作可能会更清晰,但保持消费者独立于数据流可能也更清晰。对于下面的代码,我留下了最终有界的BufferBlock,但如果您使用此解决方案,请考虑将该最终块更改为有界的ActionBlock。
private const int ProcessingSize= 4;
private static readonly HttpClient HttpClient = new HttpClient();
private TransformBlock<string, DataType> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
  PrepareDataflow(token);
  ListFiles(_fileBufferBlock, token);
  _processingBlock.Complete();
  return _processingBlock.Completion;
}

private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
{
  ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
  foreach(var fileNameUri in fileNameUris)
    _processingBlock.Post(fileNameUri);
}

private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
{
  return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
}

private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
{
  using (stream)
  using (var sr = new StreamReader(stream))
  using (var jsonTextReader = new JsonTextReader(sr))
  {
    while (jsonTextReader.Read())
    {
      token.ThrowIfCancellationRequested();
      if (jsonTextReader.TokenType == JsonToken.StartObject)
      {
        try
        {
          yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
        }
        catch (Exception ex)
        {
          _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
        }
      }
    }
  }
}

private void PrepareDataflow(CancellationToken token)
{
  var executeOptions = new ExecutionDataflowBlockOptions
  {
    CancellationToken = token,
    MaxDegreeOfParallelism = ProcessingSize
  };
  _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
      ProcessFileAsync(fileName, token), executeOptions);

  _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
  {
    CancellationToken = token,
    BoundedCapacity = 50000
  });
}

另一种选择是使用Rx。但是学习Rx可能会很困难,特别是对于混合异步和并行数据流的情况,这里就有了。
至于您的其他问题:
IEnumerable<>和yeild return如何与async/await和dataflow一起使用。
async和yield根本不兼容。至少在今天的语言中是这样的。在您的情况下,JSON读取器必须同步地从流中读取(它们不支持异步读取),因此实际的流处理是同步的,并且可以与yield一起使用。进行初始来回以获取流本身仍然可以是异步的,并且可以与async一起使用。这是我们今天能够做到的最好的方式,直到JSON读取器支持异步读取并且语言支持async yield为止。(Rx今天可以执行“async yield”,但是JSON读取器仍然不支持异步读取,因此在这种特定情况下无法帮助)。
在这种情况下,如何跟踪完成情况。
如果JSON读取器支持异步读取,则上述解决方案将不是最佳选择。在那种情况下,您将需要使用手动的SendAsync调用,并且需要仅链接这些块的完成情况,可以这样完成:
_processingBlock.Completion.ContinueWith(
    task =>
    {
      if (task.IsFaulted)
        ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
      else if (!task.IsCanceled)
        _messageBufferBlock.Complete();
    },
    CancellationToken.None,
    TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);

我应该使用数据块的LinkTo特性连接各个块?还是使用OutputAvailableAsync()和 ReceiveAsync()等方法将数据从一个块传播到另一个块?
尽可能使用LinkTo。它为您处理所有边角情况。
// 应该抛出异常吗? // 块是否应该出错?
这完全取决于您。默认情况下,当任何项的处理失败时,块会出错,如果您正在传播完成状态,则整个块链都会出错。
出错的块相当严重; 它们抛弃任何正在进行的工作并拒绝继续处理。如果要重试,您必须构建新的数据流网格。
如果您喜欢“更柔和”的错误策略,您可以捕获异常并执行诸如记录它们(您当前的代码所做的)之类的操作,或者更改数据流块的性质,以将异常作为数据项传递。

为什么我不能使用 SendAsync,而必须在这里使用 Post - Tejas Vora
@TejasVora:让我反过来问一下。在这种情况下,“SendAsync”相对于“Post”有什么好处? - Stephen Cleary
没有特别的内容,只是一个信息性的问题。 - Tejas Vora

4

值得一提的是 Rx。除非我有什么遗漏,否则您需要的整个代码(除了现有的 ProcessFileAsync 方法)看起来像这样:

var query =
    fileNameUris
        .Select(fileNameUri =>
            Observable
                .FromAsync(ct => ProcessFileAsync(fileNameUri, ct)))
        .Merge(maxConcurrent : 4);

var subscription =
    query
        .Subscribe(
            u => { },
            () => { Console.WriteLine("Done."); });

完成。它是异步运行的。通过调用subscription.Dispose();可以取消它。并且你可以指定最大并发数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接