public class DelimitedFileBlock : ISourceBlock<string>
{
private ISourceBlock<string> _source;
_source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });
//Read a file
While(!eof)
row = read one row
//if consumers are slow, then sleep for a while
while(!(_source as BufferBlock<string>).Post<string>(row))
{
Thread.Sleep(5000);
}
}
这是一个有着2400万行数据的5GB文件。
我现在有一个使用ActionBlock的目标块(Target block):
public class SolaceTargetBlock : ITargetBlock<string>
private ActionBlock<IBasicDataContract> _publishToSolaceBlock;
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
{
//post to another block to publish
bool success = _publishToSolaceBlock.Post(messageValue);
现在在控制台应用程序中,我指定:
SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);
我只是为了测试,将有界容量设置为1。
现在,我使用LinkTo将这三个消费者与我的来源链接起来:
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
在10003行后,代码会进入Thread.Sleep(5000)语句,而while循环中的Post始终返回false。
由于我使用了LinkTo,因此理应当solaceTargetBlocks完成后能够选择下一条消息,但LinkTo似乎没有清除BufferBlock。那么,如何实现多个消费者间的负载均衡呢?难道需要编写接收并编写简单的负载均衡逻辑以在多个消费者之间进行分发吗?