我在寻找一款轻量级的、进程内的异步消息总线,然后发现了TPL Dataflow。
下面是我当前的实现(完整示例请参见https://gist.github.com/4416655)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
我对在消息传递场景中使用TPL数据流有一些普遍的问题。
- Is
BroadcastBlock<T>
the recommended source for sending messages to multiple handlers concurrently? This was the conclusion I came to based on this post. - In my implementation I'm using a single
BroadcastBlock<T>
instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type? BroadcastBlock<T>
always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages).In my test application, I introduced a delay in the first handler:
// Subscribe to Message type var subscription1 = bus.Subscribe<Message>(async m => { await Task.Delay(2000); Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); });
When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (settingMaxDegreeOfParallelism = 1
made no difference).Finally, whilst
SendAsync
allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (theActionBlock<T>
). I thought that this is whatPropagateCompletion
would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.
更新
我没有得到Task.Delay
预期的行为是因为它延迟了每个处理程序的执行,而不是延迟所有处理程序的处理。我需要使用Thread.Sleep
。
Tuple
中添加cancellationToken
和canceledAction
(这意味着此时使用自定义类可能更好)。您将在SendAsync()
中设置它们,并在subscritpions
循环中使用它们。理想情况下,handlerAction
也应该接受令牌(至少是可选的)。 - svickTransformBlock
。这样我们就可以直接返回Task.WhenAll(subscriptions)
而不是使用TaskCompletionSource
了。 - Ben FosterTransformBlock
对于某些输入仍然不会直接返回输出,您需要以某种方式接收其结果。当然,您可以使用TransformBlock
实现所需的功能,但我认为使用ActionBlock
和TaskCompletionSource
实际上会比使用TransformBlock
更复杂。 - svick