我先为标题道歉,但我想这是最能描述这个行为的方式了。
需求是处理消息总线上的请求。进来的请求可能涉及到相关或者分组的id。我想要的行为是对于一系列处理相关id的请求进行同步处理。然而不同的id可以异步进行处理。
我使用一个ConcurrentDictionary来跟踪正在处理的请求和LinkTo中的谓词。
这应该提供了对相关请求的同步处理。
但是我收到的行为是第一个请求被处理了,第二个请求被丢弃了。
我已经附加了一个从控制台应用程序中模拟此问题的示例代码。
任何指导或反馈都将不胜感激。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication2
{
class Program
{
static void Main(string[] args)
{
var requestTracker = new ConcurrentDictionary<string, string>();
var bufferBlock = new BufferBlock<Request>();
var actionBlock = new ActionBlock<Request>(x =>
{
Console.WriteLine("processing item {0}",x.Name);
Thread.Sleep(5000);
string itemOut = null;
requestTracker.TryRemove(x.Id, out itemOut);
});
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));
var publisher = Task.Run(() =>
{
var request = new Request("item_1", "first item");
bufferBlock.SendAsync(request);
var request_1 = new Request("item_1", "second item");
bufferBlock.SendAsync(request_1);
});
publisher.Wait();
Console.ReadLine();
}
}
public class Request
{
public Request(string id, string name)
{
this.Id = id;
this.Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
}