我有一个相对简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出应该由一个消费者消费。
为此,我使用
创建了一个
两个'Producers同时向
简化:
乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而一个消费者正在消费生产出的数据。
然而,BufferBlock about thread safety说:
任何实例成员都不能保证线程安全。
我认为TPL中的P代表Parallel! 我应该担心吗?我的代码不是线程安全的吗?我应该使用不同的TPL Dataflow类吗?
为此,我使用
System.Threading.Tasks.Dataflow.BufferBlock<T>
。创建了一个
BufferBlock
对象。一个Consumer
正在监听此BufferBlock
,并处理接收到的任何输入。两个'Producers同时向
BufferBlock
发送数据。简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
{
while(await bufferBlock.OutputAvailable())
{
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
}
}
async Task Produce1()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
async Task Produce2()
{
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
{
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
}
}
我希望先启动消费者,然后将生产者作为单独的任务启动:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而一个消费者正在消费生产出的数据。
然而,BufferBlock about thread safety说:
任何实例成员都不能保证线程安全。
我认为TPL中的P代表Parallel! 我应该担心吗?我的代码不是线程安全的吗?我应该使用不同的TPL Dataflow类吗?
BufferBlock
而不是ActionBlock
?async Task Consume()
可以被 替换为只有Process(...)
的调用,例如ActionBLock<int>(x => Process(x))
。 - JSteward