我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式。我有一个大的数据流网格,其中约有40个块。该网格有两个主要的功能部分:生产者部分和消费者部分。当消费者处理一些指定数量的工作项时,生产者应继续为消费者提供大量的工作。否则,应用程序会消耗大量内存/CPU并且行为不可持续。我想在消费者忙于某些指定数量的工作项时暂停生产者。
我制作了演示应用程序来演示这个问题:
应用程序打印出类似于这样的内容:
那意味着生产者会不断地向消费者发送消息。我希望它能暂停并等待消费者完成当前工作,然后生产者应继续为消费者提供消息。
我的问题与其他问题相似,但没有得到正确的答案。我尝试了那个解决方案,但在这里它不起作用,允许生产者向消费者发送大量消息。同时设置
到目前为止,我想到的唯一解决方案是制作自己的块,监视目标块队列,并根据目标块的队列采取行动。但我希望这对于这个问题来说有点过头了。
我制作了演示应用程序来演示这个问题:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
应用程序打印出类似于这样的内容:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
那意味着生产者会不断地向消费者发送消息。我希望它能暂停并等待消费者完成当前工作,然后生产者应继续为消费者提供消息。
我的问题与其他问题相似,但没有得到正确的答案。我尝试了那个解决方案,但在这里它不起作用,允许生产者向消费者发送大量消息。同时设置
BoundedCapacity
也不起作用。到目前为止,我想到的唯一解决方案是制作自己的块,监视目标块队列,并根据目标块的队列采取行动。但我希望这对于这个问题来说有点过头了。
Rx
吗?看看这个答案:https://dev59.com/0XE85IYBdhLWcg3w64IA - Luc Morin