微软TPL Dataflow - 同步处理相关请求

3

我先为标题道歉,但我想这是最能描述这个行为的方式了。

需求是处理消息总线上的请求。进来的请求可能涉及到相关或者分组的id。我想要的行为是对于一系列处理相关id的请求进行同步处理。然而不同的id可以异步进行处理。

我使用一个ConcurrentDictionary来跟踪正在处理的请求和LinkTo中的谓词。

这应该提供了对相关请求的同步处理。

但是我收到的行为是第一个请求被处理了,第二个请求被丢弃了。

我已经附加了一个从控制台应用程序中模拟此问题的示例代码。

任何指导或反馈都将不胜感激。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var requestTracker = new ConcurrentDictionary<string, string>();

            var bufferBlock = new BufferBlock<Request>();

            var actionBlock = new ActionBlock<Request>(x => 
            {
                Console.WriteLine("processing item {0}",x.Name);
                Thread.Sleep(5000);
                string itemOut = null;
                requestTracker.TryRemove(x.Id, out itemOut);
            });

            bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));


            var publisher = Task.Run(() =>
            {
                var request = new Request("item_1", "first item");
                bufferBlock.SendAsync(request);

                var request_1 = new Request("item_1", "second item");
                bufferBlock.SendAsync(request_1);

            });

            publisher.Wait();
            Console.ReadLine();
        }
    }

    public class Request
    {
        public Request(string id, string name)
        {
            this.Id = id;
            this.Name = name;
        }
        public string Id { get; set; }
        public string Name { get; set; }
    }
}

你应该让异常沿着数据流管道传播,这样你就可以看到出了什么问题。请查看MS在此MSDN Walkthrough末尾的完整示例。然后,你可以处理AggregateException以找出出了什么问题。 - JNYRanger
2
你的意思是想要一个具有相同ID的组依次处理,而组可以同时处理吗?如果是这样,这里有你的答案:https://dev59.com/2XvZa4cB1Zd3GeqP-xU2 - i3arnon
1
@I3arnon 我认为这里这样做很没必要复杂化。 - svick
@svick 你会移除哪个“特性”? - i3arnon
@I3arnon 哈希。你可以为每个组只有一个块(假设这几乎是一个内存泄漏的事实并不重要)。 - svick
显示剩余5条评论
2个回答

2
  1. You say you want to process some requests in parallel (at least I assume that's what you meant by “asynchronously”), but ActionBlock is not parallel by default. To change that, set MaxDegreeOfParallelism.

  2. You're trying to use TryAdd() as the filter, but that won't work for two reasons:

    1. The filter is invoked only once, it's not automatically retried or anything like that. That means that if an item doesn't go through, it would never go through, even after the item that was blocking it was completed.
    2. If an item is stuck in the output queue of a block, no other items are going to get out of that block. This could significantly reduce the level of parallelism, even if you somehow worked around the previous issue.
  3. I think the simplest solution here would be to have a block for each group, that way, items from each group will be processed sequentially, but items from different groups will be processed in parallel. In code, it could look something like:

    var processingBlocks = new Dictionary<string, ActionBlock<Request>>();
    
    var splitterBlock = new ActionBlock<Request>(request =>
    {
        ActionBlock<Request> processingBlock;
    
        if (!processingBlocks.TryGetValue(request.Id, out processingBlock))
        {
            processingBlock = processingBlocks[request.Id] =
                new ActionBlock<Request>(r => /* process the request here */);
        }
    
        processingBlock.Post(request);
    });
    

    The issue with this approach is that the processing blocks for groups never go away. If you can't afford that (it's a memory leak), because you're going to have a large amount of groups, then the hashing approach suggested by I3arnon is the way to go.


感谢您的帮助,svick - 实际上我最终采用了I3arnon的方法。对于我尝试做的事情来说,更加稳健且容错性更强。 - rizan

1
我认为这是因为您的LinkTo()没有正确设置。通过将LinkTo()和一个函数作为参数传递,您正在添加一个条件。所以这行代码:
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id, x.Name));

这本质上是在说,如果您能够添加到并发字典中,则从 bufferBlock 传递数据到 actionBlock 是可行的,但至少在您的示例代码中并没有很好的解释这一点。

相反,在此情况下,您应该使用不带 lambda 的方式将 bufferBlock 链接到 actionBlock,因为您不需要条件链接(至少根据您的示例代码我认为是这样)。

此外,请查看此 SO 问题,以确定是否应该使用 SendAsync() 还是 Post() 因为对于简单地将数据添加到管道中来说,Post() 可能更容易处理: TPL Dataflow, whats the functional difference between Post() and SendAsync()? . SendAsync 将返回一个任务,而 Post 将根据成功进入管道返回 true/false。

所以,要找出问题出在哪里,您需要处理块的续传。 MSDN 上有一篇不错的教程,介绍了 TPL Dataflow 的入门知识,可以在这里找到:创建一个 DataFlow 管道。它可能看起来像这样:

//link to section
bufferBlock.LinkTo(actionBlock);
//continuations
bufferBlock.Completion.ContinueWith(t =>
{
     if(t.IsFaulted)  ((IDataFlowBlock).actionBlock).Fault(t.Exception); //send the exception down the pipeline
     else actionBlock.Complete(); //tell the next block that we're done with the bufferblock
 });

你可以在等待管道时捕获异常(AggregateException)。在实际代码中,你是否真的需要使用ConcurrentDictionary来进行跟踪?因为当它无法添加数据时,可能会导致问题,因为当LinkTo谓词返回false时,它不会将数据传递到管道的下一个块。

谢谢JNYRanger,我会尝试一下。使用TryAdd而不是TryGet的目的是为了线程安全。另一个原因是ConcurrentDictionary针对读取进行了优化。在我的情况下,两个相关的ID都通过了检查并且都尝试添加。因此,在这种情况下,我选择了TryAdd,其中一个将赢得竞争条件,而另一个则被搁置。 - rizan
1
@rizan 有道理,但为了让所有内容都通过,您需要有第二个 LinkTo() 来处理添加到 ConcurrentDictionary 中的失败,否则您的管道将会中断,这就是您所遇到的问题。 - JNYRanger
您提供的方法和示例在可以等待完成的环境中表现出色。我采用了I3arnon的方法,并进行了一些微调。我使用TDD先编写测试,以便覆盖所有情况。非常感谢您的帮助。 - rizan
@rizan 很高兴我能帮到你并至少指出了正确的方向。我建议你回答自己的问题,以展示给其他人你是如何解决问题的。 - JNYRanger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接