TPL Dataflow 的 BufferBlock 是线程安全的吗?

5
我有一个相对简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出应该由一个消费者消费。
为此,我使用 System.Threading.Tasks.Dataflow.BufferBlock<T>
创建了一个BufferBlock对象。一个Consumer正在监听此BufferBlock,并处理接收到的任何输入。
两个'Producers同时向BufferBlock发送数据。
简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();

async Task Consume()
{
    while(await bufferBlock.OutputAvailable())
    {
         int dataToProcess = await outputAvailable.ReceiveAsync();
         Process(dataToProcess);
    }
}

async Task Produce1()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

async Task Produce2()
{
    IEnumerable<int> numbersToProcess = ...;
    foreach (int numberToProcess in numbersToProcess)
    {
         await bufferBlock.SendAsync(numberToProcess);
         // ignore result for this example
    }
}

我希望先启动消费者,然后将生产者作为单独的任务启动:

var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());

// await until both producers are finished:
await Task.WhenAll(new Task[] {taskProduce1, taskProduce2});
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock

// await for the Consumer to finish:
await taskConsumer;

乍一看,这正是生产者-消费者的意思:多个生产者生产数据,而一个消费者正在消费生产出的数据。
然而,BufferBlock about thread safety说:
任何实例成员都不能保证线程安全。
我认为TPL中的P代表Parallel! 我应该担心吗?我的代码不是线程安全的吗?我应该使用不同的TPL Dataflow类吗?

2
MSDN文章中的“线程安全”部分是出了名的不可靠。作者只是从文章模板中复制粘贴,往往无法从开发人员那里获得足够的信息以使其更准确。请记住,仅因为DataBlock是线程安全的,并不能自动使您自己的代码也线程安全。当另一个线程正在忙于添加项目时,像Count属性这样的东西就变得毫无价值了。希望这一点是显而易见的,但在文档中没有明确说明。 - Hans Passant
1
为什么要使用 BufferBlock 而不是 ActionBlockasync Task Consume() 可以被 替换为只有 Process(...) 的调用,例如 ActionBLock<int>(x => Process(x)) - JSteward
2个回答

4
是的,BufferBlock 类是线程安全的。我无法通过指向官方文档中的“线程安全”部分来支持此声明,因为文档中已删除该部分。但是我可以在源代码中看到该类包含用于同步传入消息的锁对象:
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }

当调用Post扩展方法 (源代码) 时,会调用显式实现的ITargetBlock.OfferMessage方法 (源代码)。以下是该方法的摘录:

DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    //...
    lock (IncomingLock)
    {
        //...
        _source.AddMessage(messageValue);
        //...
    }
}

如果这个类或任何在TPL Dataflow库中包含的XxxBlock类不是线程安全的话,那将会非常奇怪。这将严重阻碍使用这个伟大的库的便捷性。


1
我认为一个ActionBlock<T>更适合你的需求,因为它具有内置缓冲区,许多生产者可以通过它发送数据。默认块选项在单个后台任务上处理数据,但您可以为并行性和有界容量设置新值。对于ActionBlock<T>,确保线程安全的主要关注点将是您传递的委托,该委托处理每个消息。该函数的操作必须独立于每个消息,即不修改共享状态,就像任何Parrallel...函数一样。
public class ProducerConsumer
{
    private ActionBlock<int> Consumer { get; }

    public ProducerConsumer()
    {
        Consumer = new ActionBlock<int>(x => Process(x));            
    }

    public async Task Start()
    {
        var producer1Tasks = Producer1();
        var producer2Tasks = Producer2();
        await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
        Consumer.Complete();
        await Consumer.Completion;
    }

    private void Process(int data)
    {
        // process
    }

    private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));

    private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
}

我的关注点是ActionBlock.SendAsync函数的线程安全性:两个不同的线程在同一个ActionBlock对象上调用ActionBlock.SendAsync。ActionBlock类提到了与BufferBlock相同的有关线程安全性的备注:SendAsync不是线程安全的。你认为ActionBlock比BufferBlock更加线程安全的原因是什么? - Harald Coppoolse
2
该备注是一个通用的警告,只有静态方法才能保证线程安全。话虽如此,如果两个线程在同一个Acton/Buffer Block上调用SendAsync,那么就会有两条消息排队等待插入到块的输入缓冲区中。只要块选项SingleProducerConstrained设置为false(默认行为),SendAsync本身就是多生产者线程安全的。 - JSteward

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接