在ActionBlock中收集结果时阻止集合

3
我认为在测试方法中,“results”集合变量的类型应该是BlockingCollection<int>而不是List<int>。如果我错了,请证明给我看。我从https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html中取得了这个例子。
private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }
}

public async Task ProduceAll(BufferBlock<int> queue)
{
    var producer1 = Produce(queue, Enumerable.Range(0, 10));
    var producer2 = Produce(queue, Enumerable.Range(10, 10));
    var producer3 = Produce(queue, Enumerable.Range(20, 10));
    await Task.WhenAll(producer1, producer2, producer3);
    queue.Complete();
}

[TestMethod]
public async Task ConsumerReceivesCorrectValues()
{
    var results = new List<int>();

    // Define the mesh.
    var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });

    //var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };

    var consumer = new ActionBlock<int>(x => results.Add(x), consumerOptions);
    queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true, });

    // Start the producers.
    var producers = ProduceAll(queue);

    // Wait for everything to complete.
    await Task.WhenAll(producers, consumer.Completion);

    // Ensure the consumer got what the producer sent.
    Assert.IsTrue(results.OrderBy(x => x).SequenceEqual(Enumerable.Range(0, 30)));
}
1个回答

4

ActionBlock<T>默认将其委托限制为一次执行(MaxDegreeOfParallelism1),因此无需使用BlockingCollection<T>而不是List<T>

我测试您的代码,结果正常通过,没有问题。

如果ActionBlock<T>传递了更高的MaxDegreeOfParallelism选项,则需要保护List<T>或替换为BlockingCollection<T>


谢谢!正如我所预料的那样。从块中收集数据的方式是典型的解决方案吗?或者在一个消费者场景中组织消费者任务并在那里收集数据会更好? codeprivate static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue) { var ret = new List<int>(); while (await queue.OutputAvailableAsync()) { ret.Add(await queue.ReceiveAsync()); }return ret;}code - Oleg Ivanov
通常情况下,一旦您建立了数据流管道或网格,您的最终操作将成为“ActionBlock”的一部分。收集所有结果项并不常见,尽管偶尔会这样做。 - Stephen Cleary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接