使用TPL Dataflow创建消息总线

5

我在寻找一款轻量级的、进程内的异步消息总线,然后发现了TPL Dataflow。

下面是我当前的实现(完整示例请参见https://gist.github.com/4416655)。

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

我对在消息传递场景中使用TPL数据流有一些普遍的问题。

  • Is BroadcastBlock<T> the recommended source for sending messages to multiple handlers concurrently? This was the conclusion I came to based on this post.
  • In my implementation I'm using a single BroadcastBlock<T> instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?
  • BroadcastBlock<T> always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages).
  • In my test application, I introduced a delay in the first handler:

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting MaxDegreeOfParallelism = 1 made no difference).

  • Finally, whilst SendAsync allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (the ActionBlock<T>). I thought that this is what PropagateCompletion would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.

更新

我没有得到Task.Delay预期的行为是因为它延迟了每个处理程序的执行,而不是延迟所有处理程序的处理。我需要使用Thread.Sleep

1个回答

7
回答完您的问题后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意。TDF适用于通过相互独立的块处理消息,没有内置的跟踪单个消息的方法。但这正是您想要的:按顺序处理消息,并跟踪每个消息的完成情况。
因此,我认为您不应创建整个数据流网络,而应该使用单个ActionBlock作为异步消息处理器:
public class Bus
{
    class Subscription
    {
        public Guid Id { get; private set; }
        public Func<object, Task> HandlerAction { get; private set; }

        public Subscription(Guid id, Func<object, Task> handlerAction)
        {
            Id = id;
            HandlerAction = handlerAction;
        }
    }

    private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
    private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();

    private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;

    public Bus()
    {
        // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
        var subscriptions = new List<Subscription>();

        m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
            async tuple =>
            {
                var message = tuple.Item1;
                var completedAction = tuple.Item2;

                // could be made more efficient, probably doesn't matter
                Guid idToUnsubscribe;
                while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                {
                    subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                }

                Subscription handlerToSubscribe;
                while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                {
                    subscriptions.Add(handlerToSubscribe);
                }

                foreach (var subscription in subscriptions)
                {
                    await subscription.HandlerAction(message);
                }

                completedAction();
            });
    }

    public Task SendAsync<TMessage>(TMessage message)
    {
        var tcs = new TaskCompletionSource<bool>();
        Action completedAction = () => tcs.SetResult(true);

        m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));

        return tcs.Task;
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        return Subscribe<TMessage>(
            message =>
            {
                handlerAction(message);
                // we need a completed non-generic Task; this is a simple, efficient way to get it
                // another option would be to use async lambda with no await,
                // but that's less efficient and produces a warning
                return Task.FromResult(false);
            });
    }

    public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
    {
        Func<object, Task> actionWithCheck = async message =>
        {
            if (message is TMessage)
                await handlerAction((TMessage)message);
        };

        var id = Guid.NewGuid();
        m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
        return id;
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        m_idsToUnsubscribe.Enqueue(subscriptionId);
    }
}

我决定使用队列来订阅和取消订阅,这样当消息被处理时,处理程序列表不会发生改变。
你的问题的答案:
“BroadcastBlock”是否是同时向多个处理程序发送消息的推荐源?
是的,乍一看,“BroadcastBlock”似乎是您想要的。在TPL Dataflow中,肯定没有类似的块直接提供。
在我的实现中,我为所有消息类型使用单个“BroadcastBlock”实例。当处理大量消息时,这可能会导致问题吗?应该为每个消息类型创建单独的实例吗?
对于所有消息类型的单个块,您需要在单个线程上完成更多工作(发送至所有处理程序)。对于每个消息类型的一个块,您将执行较少的工作(仅发送到正确的处理程序),这些工作可以在多个线程上执行。因此,我认为假设后者更快是合理的。
但不要忘记应用程序性能优化的规则:首先编写简单且易于阅读的代码。只有在实际证明它确实缓慢时,才尝试对其进行优化。并且在比较两个替代方案时,始终使用分析工具确定哪个实际上更快,而不是仅猜测哪个应该更快。
“BroadcastBlock”始终存储上次发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。是否可以更改此行为(新的订阅应仅接收新消息)?
不,无法配置“BroadcastBlock”以执行此操作。如果您不需要“BroadcastBlock”的所有功能(发送到具有有限容量的块,该容量可能会暂时满,支持非贪心块作为目标),则可能需要编写自定义版本的“BroadcastBlock”来执行此操作。
发送消息时,我希望每个消息依次在控制台上输出,间隔2秒。但实际上,在2秒后,所有消息一起输出。我假设这是由底层调度程序执行的并行性导致的,但我想知道如何更改这些设置(将“MaxDegreeOfParallelism=1”设置为无效)。
TDF的一个要点是每个块都是独立的,因此多个块可以在多个线程上执行。如果这不是您想要的,则为每个处理程序使用单独的“ActionBlock”可能不是最佳解决方案。实际上,TDF可能根本不是最佳解决方案。
此外,Subscribe()接受Action<TMessage>,这意味着您的lambda将被编译为异步void方法。只有在特定(且相对罕见)情况下,您才应该使用它们。如果要支持async处理程序,则应接受async Task方法,即Func<TMessage, Task>

我没有得到使用Task.Delay所期望的行为是因为这会延迟每个处理程序的执行,而不是所有处理程序的处理。我需要的是Thread.Sleep

如果可能的话,使用Thread.Sleep()与异步相违背,不应使用它。此外,我认为它实际上并没有按照您想要的方式工作:它向每个线程引入了一个延迟,但TPL Dataflow将使用多个线程,因此这样做不会像您想的那样运行。

最后,虽然SendAsync允许我等待消息的发送,但它不允许我等待目标(ActionBlock<T>)的完成。我认为这就是PropagateCompletion的作用,但事实并非如此。理想情况下,我想知道消息的所有处理程序都已执行完成。

PropagateCompletionComplete()Completion一起用于处理整个块的完成,而不是单个消息的处理。其中一个原因是对于更复杂的数据流网络,可能不清楚何时确切地处理消息。例如,如果消息已发送到BroadcastBlock<T>的所有当前目标,但也将发送到所有新添加的目标,是否应将其视为完成?
如果要执行此操作,则必须以某种方式手动执行,可能使用TaskCompletionSource

很棒的解决方案。我特别喜欢订阅/取消订阅队列背后的想法。但有一个问题,您如何扩展它以支持传递取消令牌来停止处理程序的执行? - Ben Foster
如果您想取消单个消息的处理,我会在Tuple中添加cancellationTokencanceledAction(这意味着此时使用自定义类可能更好)。您将在SendAsync()中设置它们,并在subscritpions循环中使用它们。理想情况下,handlerAction也应该接受令牌(至少是可选的)。 - svick
谢谢。我很好奇,为什么不使用返回任务的TransformBlock。这样我们就可以直接返回Task.WhenAll(subscriptions)而不是使用TaskCompletionSource了。 - Ben Foster
我不确定我理解了。 TransformBlock 对于某些输入仍然不会直接返回输出,您需要以某种方式接收其结果。当然,您可以使用TransformBlock实现所需的功能,但我认为使用ActionBlockTaskCompletionSource实际上会比使用TransformBlock更复杂。 - svick
好的,再次感谢。顺便说一下,我已经将这段代码推送到了GitHub上。https://github.com/benfoster/Fabrik.SimpleBus - Ben Foster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接