我认为在测试方法中,“results”集合变量的类型应该是
BlockingCollection<int>
而不是List<int>
。如果我错了,请证明给我看。我从https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html中取得了这个例子。private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
foreach (var value in values)
{
await queue.SendAsync(value);
}
}
public async Task ProduceAll(BufferBlock<int> queue)
{
var producer1 = Produce(queue, Enumerable.Range(0, 10));
var producer2 = Produce(queue, Enumerable.Range(10, 10));
var producer3 = Produce(queue, Enumerable.Range(20, 10));
await Task.WhenAll(producer1, producer2, producer3);
queue.Complete();
}
[TestMethod]
public async Task ConsumerReceivesCorrectValues()
{
var results = new List<int>();
// Define the mesh.
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
//var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var consumer = new ActionBlock<int>(x => results.Add(x), consumerOptions);
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true, });
// Start the producers.
var producers = ProduceAll(queue);
// Wait for everything to complete.
await Task.WhenAll(producers, consumer.Completion);
// Ensure the consumer got what the producer sent.
Assert.IsTrue(results.OrderBy(x => x).SequenceEqual(Enumerable.Range(0, 30)));
}
code
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue) { var ret = new List<int>(); while (await queue.OutputAvailableAsync()) { ret.Add(await queue.ReceiveAsync()); }return ret;
}code
- Oleg Ivanov