保证传递的Dataflow BroadcastBlock替代方案

7

我需要一个类似于广播块的对象,但具有保证交付的功能。因此,我使用了这个问题的答案。但我并不完全清楚这里的执行流程。我有一个控制台应用程序。以下是我的代码:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

每个数字都必须按顺序处理,因此在广播器块中我不能使用MaxDegreeOfParallelism。但是所有接收数字的操作块可以并行运行。

所以问题来了:

在输出中我可以看到不同的线程ID。我理解它的工作方式是否如下:

执行命中广播器中的await block.SendAsync(num);。 如果当前块还没有准备好接受这个数字,则执行退出广播器并挂起在Task.WaitAll。 当块接受该数字时,在线程池中执行广播器中foreach语句的其余部分。 并且同样的情况一直持续到结束。 每次foreach迭代都在线程池中执行。 但实际上是按顺序执行的。

我的理解是正确的吗? 我该如何更改此代码以异步地将数字发送到所有块?

为了确保如果其中一个块在此时不准备好接收数字,我将不等待它,并且所有准备好的其他块将接收数字。 并且所有块都可以并行运行。 并保证传递。

1个回答

7
假设您想让 broadcaster 一次处理一个项目,同时使目标块可以并发地接收该项目,则需要将 broadcaster 更改为同时向所有块提供该数字,然后异步等待它们共同接受该数字,然后再继续下一个数字:
var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

现在,在这种情况下,如果你在await之后没有工作要做,你可以稍微优化一下,同时仍然返回一个可等待的任务:
ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);

在移动到下一个数字之前,是否有可能不等待所有解析器完成?我的意思是,每当某个解析器的缓冲区可用于接收时,广播器就会发送给它。这样我就不必等待最慢的解析器。我想我在问题中做了一些错误的解释。我需要每个单独的解析器按照它们接收到的顺序处理所有数字。但我不需要在每个数字后等待所有解析器都完成。 - shda
当您调用SendAsync并且目标缓冲区有空间时,返回的任务将立即完成。只有在其中一个缓冲区已满时才需要等待。您可以增加该缓冲区,但在确保项目移动到下一个块之前,我不会继续进行。 - i3arnon
但是我能否不等待其中一个缓冲区满了,而是继续处理下一个数字到空缓冲区? - shda
你可以这样做,但这可能导致内存泄漏。你需要存储额外的数字,以便稍后能够发送到完整的块中,这就像创建了另一个您无法控制的缓冲区。如果内存不是问题,可以简单地增加块的有界容量(甚至使其无限制),但这可能会填满您的RAM。 - i3arnon
我们能得到多少“保证”?Dataflow 中的所有内容都在内存中。如果关键任务存在任何缓冲区溢出的可能性,我们不应该使用更耐用的消息队列吗?如果不是关键任务或者不是必要的,执行块类型中的缓冲是否足够?本答案中概述的机制似乎过于复杂/简单。 - Marc L.
3
@MarcL. TPL Dataflow 是一个内存库。如果您需要数据在系统崩溃后仍然存在,当然不能依赖它,您应该使用持久化排队机制。顺便说一句,由于 .NET 是托管的,只要您不使用不安全的代码,就不可能发生缓冲区溢出(但是可能会因为堆栈溢出而崩溃)。 - i3arnon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接