TxSelect and TransactionScope

5
最近,我一直在研究使用C#实现发布/订阅的方法,其中RabbitMQ是我的关注点。我更习惯于使用NServiceBus。NServiceBus通过在TransactionScope中注册MSMQ来处理事务。其他支持事务的操作也可以在同一个TransactionScope中注册(如MSSQL),因此一切都是真正原子的。在底层,NSB引入了MSDTC进行协调。
我发现,在RabbitMQ的C#客户端API中有一个IModel.TxSelect()IModel.TxCommit()。这很好地解决了在提交之前不向交换机发送消息的问题。这覆盖了向交换机发送多个需要原子处理的消息的情况。然而,是否有一种良好的方法将数据库调用(比如MSSQL)与RabbitMQ事务同步?

你的系统预计需要什么样的吞吐量? - kzhen
@kzhen 我并不担心性能问题。但一致性很重要。我将使用持久化的交换机和队列。每天的吞吐量不会太高,大约在50-100,000条消息左右。 - Davin Tryon
3个回答

16

您可以通过实现IEnlistmentNotification接口编写一个RabbitMQ资源管理器,以供MSDTC使用。该实现为事务管理器提供了两阶段提交通知回调函数,用于参与事务。请注意,MSDTC的代价很高,会严重降低您的整体性能。

RabbitMQ资源管理器示例:

sealed class RabbitMqResourceManager : IEnlistmentNotification
{
    private readonly IModel _channel;

    public RabbitMqResourceManager(IModel channel, Transaction transaction)
    {
        _channel = channel;
        _channel.TxSelect();
        transaction.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public RabbitMqResourceManager(IModel channel)
    {
        _channel = channel;
        _channel.TxSelect();
        if (Transaction.Current != null)
            Transaction.Current.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public void Commit(Enlistment enlistment)
    {
        _channel.TxCommit();
        enlistment.Done();
    }

    public void InDoubt(Enlistment enlistment)
    {           
        Rollback(enlistment);
    }

    public void Prepare(PreparingEnlistment preparingEnlistment)
    {
        preparingEnlistment.Prepared();
    }

    public void Rollback(Enlistment enlistment)
    {
        _channel.TxRollback();
        enlistment.Done();
    }
}

使用资源管理器的示例

using(TransactionScope trx= new TransactionScope())
{
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.DeliveryMode = 2;

    new RabbitMqResourceManager(_channel, trx);
    _channel.BasicPublish(someExchange, someQueueName, basicProperties, someData);
    trx.Complete();
}

有趣的是...但最終我們選擇了一個無 MSDTC 解決方案。謝謝將此添加到問題中,希望其他人也會發現它有用。 :) - Davin Tryon
我在我的应用程序中使用相同的方法。然而,我发现IEnlistmentNotification的Commit/Prepare方法在不同的线程中被调用(这似乎是System.Transaction中的正常行为),这会导致RabbitMQ出现一些问题,因为官方文档指出IModel不应该跨线程使用。你有遇到过这个问题吗? - Normand Bedard
不,我没有。我认为他们指的是线程并发,因为该通道不是线程安全的。因此,如果多个线程同时使用该通道,就会出现问题。 - user2930590

5
据我所知,目前没有办法将TxSelect/TxCommit与TransactionScope协调使用。
当前的做法是使用持久消息的耐用队列来确保它们可以在RabbitMQ重启后继续存在。然后,在从队列中消费时,我读取一条消息并进行一些处理,然后将记录插入数据库,完成所有操作后ACK(确认)该消息,并将其从队列中删除。这种方法的潜在问题是,如果消息提交到数据库,但连接到RabbitMQ断开,导致无法在ACK之前将消息发送回服务器,则该消息可能会被处理两次。但对于我们正在构建的系统,我们关注吞吐量。(我相信这称为“至少一次”方法)。
RabbitMQ网站确实指出了使用TxSelect和TxCommit存在显著的性能损失,因此建议对两种方法进行基准测试。
无论如何,您都需要确保您的消费者能够处理可能被处理两次的消息。
若未找到相关内容,请查看RabbitMQ的.Net用户指南, 可在这里找到:此处,具体为第3.5节。

是的,我同意你写的一切。然而,在我的特定情况下,消费者不是问题所在。我遇到的情况是生产者可能会提交数据库状态,然后发布消息。但是,我完全同意消费者端需要幂等消息的需求。感谢你提醒我们关于 TxSelect 和 TxCommit 性能的问题。 - Davin Tryon
也许你的方法可以让生产者先提交数据库行,然后发送消息,当服务器确认接收后再更新该行以表示消息已发送(http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/)。如果生产者在重新上线时崩溃,它可以查找尚未发布其消息的行,然后重新发送它们。 - kzhen
是的,我认为你说得对。我们计划提交消息需要使用原始数据库事务发送,然后由调度程序接收并发送到Rabbit。最后,我们将提交消息已发送的信息。感谢提供链接! - Davin Tryon

0
假设您已经为抽象IServiceBus实现了服务总线。我们可以假装它在底层使用的是rabbitmq,但它肯定不需要这样。
当您调用servicebus.Publish时,您可以检查System.Transaction.Current以查看是否处于事务中。如果您正在进行mssql服务器连接的事务,则可以将其发布到sql服务器内的代理队列,该队列将尊重您执行的任何数据库操作的提交/回滚(您需要在此处进行一些连接魔法,以避免代理发布将您的txn升级为msdtc)
现在您需要创建一个服务,该服务需要读取代理队列并实际发布到Rabbit。这样,对于非常重要的事情,您可以保证您的数据库操作已经完成,并且消息将来某个时候(当服务中继它时)发布到Rabbit。如果在提交代理时发生异常,则仍然可能出现故障,但问题的窗口大大缩小,最坏的情况是您最终会多次发布,但您永远不会丢失消息。这是非常不可能的,例如,在接收后但提交之前SQL服务器离线将导致您最终至少双倍发布(当服务器重新上线时,您将再次发布)。您可以构建智能服务以减轻一些问题,但除非使用msdtc及其所有内容(呀!)或构建自己的msdtc(呀呀!),否则您将面临潜在的故障,这完全取决于使窗口变小和不太可能发生。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接