TPL数据流:固定缓冲区大小而不会丢弃项目

9

在玩转数据流之后,我遇到了一个新问题。我想限制所有块的输入队列。我的生产块(ActionBlock)非常快地创建5000个元素,并将它们发布到广播块中。因此,如果我将广播块的BoundedCapacity设置为100,则会丢失大量数据。但我更希望生产块等待缓冲块的输入队列中有新的空位。

有没有办法解决这个问题?


1个回答

8
这正是BufferBlock的用途。如果设置了其BoundedCapacity并且已满,它将推迟接收任何消息,直到有人消费它们。这意味着例如Post()将会被阻塞,而SendAsync()将返回未完成的Task
编辑:没有内置的块可以发送到多个目标并且不会丢弃数据。但你可以很容易地从ActionBlock和发送循环构建一个自己的块。
static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        },
        new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });

    // TODO: propagate completion from block to targets

    return block;
}

这段代码假设您不需要为每个目标克隆数据,并且目标列表永远不会更改。修改代码应该相当简单。


首先,感谢您。我完全看不见...。但是我需要一个可以向多个接收者发送消息的块。这对于缓冲块来说是不可能的。有什么解决方法吗? - Daffi
当其中一个接收器变慢时,应该发生什么?其他接收器是否需要等待它?(我假设接收器也设置了BoundedCapacity,否则在缓冲区设置BoundedCapacity基本上没有任何效果。) - svick
是的,BoundedCapacity 总是相同的值。同时也是真的,其他接收者需要等待慢操作。 - Daffi
7
根据我的经验,在一个已满的BufferBlock上调用Post()方法时,不会被阻塞。 - piedar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接