11得票1回答
SingleProducerConstrained和MaxDegreeOfParallelism - SingleProducerConstrained:限制只有一个生产者的并行处理器。 - MaxDegreeOfParallelism:并行处理器的最大并行度。

在C# TPL Dataflow库中,SingleProducerConstrained是ActionBlock的一种优化选项,当只有一个线程在提供操作块时,可以使用它: 如果一个块只会被单个生产者使用,也就是说,每次只有一个线程会在块上使用Post、OfferMessage和Complet...

14得票2回答
如何在TPL Dataflow中将多个目标块与源块链接?

我原以为以下代码可以从两个发布者(publishers)那里都输出结果,但实际上只从第一个发布者那里输出结果: 我期望以下代码可以从两个发布者那里都得到输出,但它仅从第一个发布者那里输出结果:var broadcastBlock = new BroadcastBlock<int>...

9得票3回答
TPL数据流块消耗了所有可用内存。

我有一个TransformManyBlock块,其设计如下: 输入:文件路径 输出:文件内容的IEnumerable,每行一个 我正在处理一个超大文件(61GB),因为它太大无法放入内存,所以我将此块和所有下游块的BoundedCapacity设置为非常低的值(例如1)。然而,该块显然...

23得票2回答
BufferBlock在TryReceiveAll后使用OutputAvailableAsync时出现死锁

在回答这个问题期间,我编写了以下代码段:an answervar buffer = new BufferBlock<object>(); var producer = Task.Run(async () => { while (true) { ...

13得票3回答
使用TPL Dataflow按持续时间或阈值进行批处理

我已经使用TPL Dataflow实现了生产者..消费者模式。使用案例是代码从Kafka总线中读取消息。为了提高效率,当发送到数据库时,我们需要批量处理消息。 在TPL数据流中是否有一种方式可以保留消息,并在达到大小或持续时间阈值时触发? 例如,当前的实现在从队列中拉出消息后立即发布该消息。 ...

54得票2回答
实现可重试代码块的正确完成

提示:各位,这个问题不是关于如何实现重试策略的,而是关于正确完成TPL Dataflow块的。 这个问题主要是我之前的一个问题Retry policy within ITargetBlock的延续。对于这个问题的答案是@svick的聪明解决方案,利用了TransformBlock(源)和Tra...

9得票2回答
数百个用户和大文件的良好处理方法

我有几个数据文件(每个文件接近1GB),其中的数据是字符串行。 我需要使用数百个消费者处理每个文件。每个消费者执行的处理与其他消费者不同。消费者不会同时写入任何地方,他们只需要输入字符串。处理后,他们会更新本地缓冲区。消费者可以轻松并行执行。 重要提示:对于一个特定的文件,每个消费者必须按...

7得票1回答
等待 ActionBlock<T> - TPL 数据流

我正在使用TPL DataFlow和ActionBlock来创建并行性。使用TPL DataFlow的理由是它支持异步,但我无法使其正常工作。 var ab = new ActionBlock&lt;Group&gt;(async group =&gt; { try { ...

7得票2回答
当消费者不堪重负时,如何让快速生产者暂停?

我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式。我有一个大的数据流网格,其中约有40个块。该网格有两个主要的功能部分:生产者部分和消费者部分。当消费者处理一些指定数量的工作项时,生产者应继续为消费者提供大量的工作。否则,应用程序会消耗大量内存/CPU并且行为不可持续。我...

7得票2回答
ActionBlock<T> vs Task.WhenAll

我希望知道执行多个异步方法的推荐方式是什么? 在System.Threading.Tasks.Dataflow中,我们可以指定最大并行度,但对于Task.WhenAll来说,无界限可能是默认值吗? 这样做: var tasks = new List&lt;Task&gt;(); fore...