TPL数据流LinkTo多个消费者无法工作

6
我有一个BufferBlock,可以向其中发布消息:
public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

这是一个有着2400万行数据的5GB文件。

我现在有一个使用ActionBlock的目标块(Target block):

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

现在在控制台应用程序中,我指定:

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

我只是为了测试,将有界容量设置为1。

现在,我使用LinkTo将这三个消费者与我的来源链接起来:

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

在10003行后,代码会进入Thread.Sleep(5000)语句,而while循环中的Post始终返回false。

由于我使用了LinkTo,因此理应当solaceTargetBlocks完成后能够选择下一条消息,但LinkTo似乎没有清除BufferBlock。那么,如何实现多个消费者间的负载均衡呢?难道需要编写接收并编写简单的负载均衡逻辑以在多个消费者之间进行分发吗?


1
这完全取决于您如何实现块接口。但是,除非您正在做更复杂的事情,否则您不必(也可能不应该)自己实现接口。只需创建所需的块设置即可。 - svick
1个回答

14

DataflowBlock<T>Post方法文档中可以看到(重点是我的):

一旦目标块决定接受或拒绝该项,此方法将返回,

这意味着目标可以选择拒绝该块(这就是您看到的行为)。

此外,它还指出:

对于支持推迟提供的消息的目标块,或者对于可能在其Post实现中进行更多处理的块,请考虑使用SendAsync,它将立即返回并使目标能够推迟发布的消息,并在SendAsync返回后稍后使用它。

这意味着您可能会获得更好的结果(取决于目标块),因为您的消息可能会被推迟,但仍会被处理,而不是直接被拒绝。

我想你所看到的问题可能与BufferBlock<T>和三个ActionBlock<TInput>实例上的BoundedCapacity property设置有关:
  • BufferBlock<T>上的最大缓冲区为10000;一旦将10000个项目放入队列中,它将拒绝其余的(参见上面的第二个引用),因为它无法处理它们(SendAsync也不能在这里工作,因为它无法缓冲要延迟的消息)。

  • ActionBlock<TInput>实例上的最大缓冲区为1,而你有三个这样的实例。

10,000 + (1 * 3) = 10,000 + 3 = 10,003

为了解决这个问题,你需要做一些事情。
首先,在创建ActionBlock<TInput>实例时,需要为ExecutionDataFlowBlockOptionsMaxDegreeOfParallelism属性设置更合理的值。
默认情况下,ActionBlock<TInput>MaxDegreeOfParallelism设置为1;这保证了调用将被序列化,您不必担心线程安全。如果你希望ActionBlock<T>关注线程安全,则保持此设置。
如果ActionBlock<TInput>是线程安全的,那么您没有理由对其进行节流,应将MaxDegreeOfParallelism设置为DataflowBlockOptions.Unbounded
如果在ActionBlock<TInput>中访问一些共享资源,并且这些资源可以被并发地有限访问,那么您可能正在做错误的事情。
如果您有某种共享资源,则很可能应该通过另一个块运行它,并在该块上设置MaxDegreeOfParallelism
其次,如果您关心吞吐量并且允许丢弃项目,则应设置BoundedCapacity属性。
请注意,你指出“如果消费者速度慢,请休息一会儿”;如果你正确地连接了块,则没有理由这样做,你应该让数据自由流动,并仅在需要时放置限制。生产者不应负责限制消费者,应该让消费者负责限制。
最后,你的代码看起来不需要自己实现数据流块接口。你可以像这样构建它:
// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

还要注意,拥有三个 ActionBlock<TInput> 实例是不必要的,除非您需要将输出过滤到不同的操作(在这里您没有这样做),所以以上代码可以简化为以下内容(假设您的操作是线程安全的,因此您将将 MaxDegreeOfParallelism 增加到 Unbounded):

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接