在玩转数据流之后,我遇到了一个新问题。我想限制所有块的输入队列。我的生产块(ActionBlock)非常快地创建5000个元素,并将它们发布到广播块中。因此,如果我将广播块的BoundedCapacity设置为100,则会丢失大量数据。但我更希望生产块等待缓冲块的输入队列中有新的空位。
有没有办法解决这个问题?
在玩转数据流之后,我遇到了一个新问题。我想限制所有块的输入队列。我的生产块(ActionBlock)非常快地创建5000个元素,并将它们发布到广播块中。因此,如果我将广播块的BoundedCapacity设置为100,则会丢失大量数据。但我更希望生产块等待缓冲块的输入队列中有新的空位。
有没有办法解决这个问题?
BufferBlock
的用途。如果设置了其BoundedCapacity
并且已满,它将推迟接收任何消息,直到有人消费它们。这意味着例如Post()
将会被阻塞,而SendAsync()
将返回未完成的Task
。ActionBlock
和发送循环构建一个自己的块。static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
var targetsList = targets.ToList();
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
},
new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });
// TODO: propagate completion from block to targets
return block;
}
这段代码假设您不需要为每个目标克隆数据,并且目标列表永远不会更改。修改代码应该相当简单。
BoundedCapacity
,否则在缓冲区设置BoundedCapacity
基本上没有任何效果。) - svickBufferBlock
上调用Post()
方法时,不会被阻塞。 - piedar