使用TPL Dataflow创建消息总线


我在寻找一款轻量级的、进程内的异步消息总线,然后发现了TPL Dataflow。


public class Bus
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
        return SendAsync<TMessage>(message, CancellationToken.None);

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
        return broadcast.SendAsync(message, cancellationToken);

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);

    public void Unsubscribe(Guid subscriptionId)
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))

    private Guid AddSubscription(IDisposable subscription)
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;


  • Is BroadcastBlock<T> the recommended source for sending messages to multiple handlers concurrently? This was the conclusion I came to based on this post.
  • In my implementation I'm using a single BroadcastBlock<T> instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?
  • BroadcastBlock<T> always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages).
  • In my test application, I introduced a delay in the first handler:

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);

    When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting MaxDegreeOfParallelism = 1 made no difference).

  • Finally, whilst SendAsync allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (the ActionBlock<T>). I thought that this is what PropagateCompletion would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.




回答完您的问题后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意。TDF适用于通过相互独立的块处理消息,没有内置的跟踪单个消息的方法。但这正是您想要的:按顺序处理消息,并跟踪每个消息的完成情况。
public class Bus
    class Subscription
        public Guid Id { get; private set; }
        public Func<object, Task> HandlerAction { get; private set; }

        public Subscription(Guid id, Func<object, Task> handlerAction)
            Id = id;
            HandlerAction = handlerAction;

    private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
    private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();

    private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;

    public Bus()
        // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
        var subscriptions = new List<Subscription>();

        m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
            async tuple =>
                var message = tuple.Item1;
                var completedAction = tuple.Item2;

                // could be made more efficient, probably doesn't matter
                Guid idToUnsubscribe;
                while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                    subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);

                Subscription handlerToSubscribe;
                while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))

                foreach (var subscription in subscriptions)
                    await subscription.HandlerAction(message);


    public Task SendAsync<TMessage>(TMessage message)
        var tcs = new TaskCompletionSource<bool>();
        Action completedAction = () => tcs.SetResult(true);

        m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));

        return tcs.Task;

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
        return Subscribe<TMessage>(
            message =>
                // we need a completed non-generic Task; this is a simple, efficient way to get it
                // another option would be to use async lambda with no await,
                // but that's less efficient and produces a warning
                return Task.FromResult(false);

    public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
        Func<object, Task> actionWithCheck = async message =>
            if (message is TMessage)
                await handlerAction((TMessage)message);

        var id = Guid.NewGuid();
        m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
        return id;

    public void Unsubscribe(Guid subscriptionId)

是的,乍一看,“BroadcastBlock”似乎是您想要的。在TPL Dataflow中,肯定没有类似的块直接提供。
此外,Subscribe()接受Action<TMessage>,这意味着您的lambda将被编译为异步void方法。只有在特定(且相对罕见)情况下,您才应该使用它们。如果要支持async处理程序,则应接受async Task方法,即Func<TMessage, Task>


如果可能的话,使用Thread.Sleep()与异步相违背,不应使用它。此外,我认为它实际上并没有按照您想要的方式工作:它向每个线程引入了一个延迟,但TPL Dataflow将使用多个线程,因此这样做不会像您想的那样运行。



很棒的解决方案。我特别喜欢订阅/取消订阅队列背后的想法。但有一个问题,您如何扩展它以支持传递取消令牌来停止处理程序的执行? - Ben Foster
如果您想取消单个消息的处理,我会在Tuple中添加cancellationTokencanceledAction(这意味着此时使用自定义类可能更好)。您将在SendAsync()中设置它们,并在subscritpions循环中使用它们。理想情况下,handlerAction也应该接受令牌(至少是可选的)。 - svick
谢谢。我很好奇,为什么不使用返回任务的TransformBlock。这样我们就可以直接返回Task.WhenAll(subscriptions)而不是使用TaskCompletionSource了。 - Ben Foster
我不确定我理解了。 TransformBlock 对于某些输入仍然不会直接返回输出,您需要以某种方式接收其结果。当然,您可以使用TransformBlock实现所需的功能,但我认为使用ActionBlockTaskCompletionSource实际上会比使用TransformBlock更复杂。 - svick
好的,再次感谢。顺便说一下,我已经将这段代码推送到了GitHub上。https://github.com/benfoster/Fabrik.SimpleBus - Ben Foster

