当消费者不堪重负时,如何让快速生产者暂停?

7
我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式。我有一个大的数据流网格,其中约有40个块。该网格有两个主要的功能部分:生产者部分和消费者部分。当消费者处理一些指定数量的工作项时,生产者应继续为消费者提供大量的工作。否则,应用程序会消耗大量内存/CPU并且行为不可持续。我想在消费者忙于某些指定数量的工作项时暂停生产者。
我制作了演示应用程序来演示这个问题:

mesh

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false
            };

            var boundedOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                BoundedCapacity = 5
            };

            var bufferBlock = new BufferBlock<int>(boundedOptions);
            var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
            var broadcastBlock = new BroadcastBlock<int>(x => x, options);

            var consumerBlock = new ActionBlock<int>(async x =>
            {
                var delay = 1000;
                if (x > 10) delay = 5000;

                await Task.Delay(delay);

                Console.WriteLine(x);
            }, boundedOptions);

            producerBlock.LinkTo(bufferBlock);
            bufferBlock.LinkTo(broadcastBlock);
            broadcastBlock.LinkTo(producerBlock);
            broadcastBlock.LinkTo(consumerBlock);

            bufferBlock.Post(1);

            consumerBlock.Completion.Wait();            
        }        
    }
}

应用程序打印出类似于这样的内容:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079

那意味着生产者会不断地向消费者发送消息。我希望它能暂停并等待消费者完成当前工作,然后生产者应继续为消费者提供消息。
我的问题与其他问题相似,但没有得到正确的答案。我尝试了那个解决方案,但在这里它不起作用,允许生产者向消费者发送大量消息。同时设置BoundedCapacity也不起作用。
到目前为止,我想到的唯一解决方案是制作自己的块,监视目标块队列,并根据目标块的队列采取行动。但我希望这对于这个问题来说有点过头了。

你考虑过使用 Rx 吗?看看这个答案:https://dev59.com/0XE85IYBdhLWcg3w64IA - Luc Morin
1
在你的演示中,生产者可以独立生成所有消息,而无需通过广播块从自身接收消息。你的真实代码也是如此吗,还是那个生产者→生产者的循环是必要的? - svick
@svick 但是如果我将生产者链接到自身,那么消费者如何获取来自生产者的数据呢?真正的生产者周期由一堆块组成,这些块实际上加载连续的评论页面,解析页面并将评论传递给消费者。 - kseen
1
@kseen,我在问广播→生产者链接是否必要,或者是否可以避免。 - svick
@svick 对于误解我感到抱歉。如果这很重要并且可以导致更优雅的代码,那么将其删除会很好。 - kseen
显示剩余2条评论
2个回答

6
如果您需要保持生产者→缓冲区→广播循环完整,那么您需要用其他块替换广播块,该块仍然广播它接收到的消息,但在其目标之一已满时等待。只要在创建该块时知道其目标,就可以使用ActionBlock构建它(代码从我的另一个答案中复制):
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets)
{
    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targets)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targets)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

使用此功能,您可以声明广播块:
var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock);

您还需要删除从broadcastBlock链接的LinkTo行。

您原始代码中存在的一个问题,这并没有解决,那就是在TPL Dataflow中,带有循环的完成是一个困难的问题。


关于完成,如果我的网络将是连续的呢?就像未来没有任何完成,它应该在应用程序工作时继续工作。 - kseen
我刚刚在我的演示应用程序中尝试了这个“GuaranteedBroadcastBlock”,它的效果非常好!完美!非常感谢。 - kseen
这是最理想的情况:您不需要完成,所以即使它不起作用也没关系。 - svick

0

看起来你的生产者生成了一个序列,因此不需要整个生产者→缓冲区→广播循环。相反,所有三个块都可以被一个async循环替换,该循环生成下一个项目,然后使用await SendAsync()将其发送给消费者:

Task.Run(async () =>
{
    int i = 1;
    while (true)
    {
        await consumerBlock.SendAsync(i);
        i++;
    }
    consumerBlock.Complete();
});

这样一来,一旦消费者达到其容量,await SendAsync() 将确保生产者等待,直到消费者消耗一个项目。

如果您想将这样的生产者封装到数据流块中,以便您可以将其链接到消费者,您可以这样做。


我的真正“生产者”是一组块,它们加载评论页面(其中包含下一个评论页面的链接),解析当前评论页面的内容,将评论发送给消费者,并再次启动此循环,将下一个评论页面的地址传递给生产者循环中的第一个块。因此,不幸的是,这不仅仅是一个序列。它就像是一个链接序列,其中每个元素都有指向下一个元素的地址,而序列中的最后一个元素没有下一个元素的地址。对于这个问题如此简单,我感到抱歉。 - kseen
我刚刚制作了更好地代表实际情况的图表。请看这里:http://imgur.com/iEklfeG - kseen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接