我需要一个类似于广播块的对象,但具有保证交付的功能。因此,我使用了这个问题的答案。但我并不完全清楚这里的执行流程。我有一个控制台应用程序。以下是我的代码:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);
Task.WaitAll(ToWait.ToArray());
Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
broadcaster.Complete();
}
每个数字都必须按顺序处理,因此在广播器块中我不能使用MaxDegreeOfParallelism。但是所有接收数字的操作块可以并行运行。
所以问题来了:
在输出中我可以看到不同的线程ID。我理解它的工作方式是否如下:
执行命中广播器中的await block.SendAsync(num);
。
如果当前块还没有准备好接受这个数字,则执行退出广播器并挂起在Task.WaitAll。
当块接受该数字时,在线程池中执行广播器中foreach语句的其余部分。
并且同样的情况一直持续到结束。
每次foreach迭代都在线程池中执行。 但实际上是按顺序执行的。
我的理解是正确的吗? 我该如何更改此代码以异步地将数字发送到所有块?
为了确保如果其中一个块在此时不准备好接收数字,我将不等待它,并且所有准备好的其他块将接收数字。 并且所有块都可以并行运行。 并保证传递。
SendAsync
并且目标缓冲区有空间时,返回的任务将立即完成。只有在其中一个缓冲区已满时才需要等待。您可以增加该缓冲区,但在确保项目移动到下一个块之前,我不会继续进行。 - i3arnon