当Rebus超时时,我们如何避免多个Rebus消息的出现?

5
我们正在使用Rebus作为与Sql Server配合的队列系统。我们有几个不同类型消息的接收者。每个消息可以由某些特定类型的多个工作者处理。
每条消息应该只被一个工作者处理/处理(第一个拉取它的工作者)。如果某个工作者由于某种原因无法完成任务,则会使用超时服务将其推迟。
如果我理解正确,它将成为一个TimeoutRequest并放入超时表中。当重新运行时,它变成一个TimeoutReply,然后再次作为原始消息重新引入队列。
我们遇到的问题是,当它变成TimeoutReply时,所有工作者都会拿起它并创建原始消息。当超时时,一个原始消息会变成多个消息(数量与工作者一样多)。
我们的Rebus设置如下:
“服务器端”:
        var adapter = new BuiltinContainerAdapter();
        Configure.With(adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
            .CreateBus()
            .Start();

        return adapter;

"Worker side":
“工作端”:
        _adapter = new BuiltinContainerAdapter();
        Configure.With(_adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
                .EnsureTableIsCreated())
            .Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
            .Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
            .Behavior(x => x.SetMaxRetriesFor<Exception>(0))
            .Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
            .CreateBus().Start(numberOfWorkers);

非常感谢您的帮助解决问题或提供理解!

1个回答

1
我能想到的唯一原因是每个工作进程都充当一个超时管理器,并且它们似乎共享相同的存储,所以你可能会遇到多个超时回复。这样,由于查询到期超时时超时管理器不使用任何锁定或其他东西,它们可能会抓取相同的到期超时,从而导致多个超时回复 - 即有竞态条件,但由于this SQL没有注意到是否实际删除了行,所以这种情况不会被注意到。
我建议您要么为工作进程使用单独的超时表(例如_inputQueue + ".timeouts"),要么让所有工作进程使用外部超时管理器(即省略Timeouts(x => ...)并启动一个独立的专用超时管理器)。
在您的情况下,我认为(a)是最简单的方法,因为它与您现在拥有的东西非常接近。
不过,我自己更喜欢(b),通常每台托管Rebus端点的机器都有一个超时管理器。
请让我知道是否解决了您的问题。

此外,我很好奇你对SQL传输的运作情况如何 :)


感谢您宝贵的反馈。您所描述的情况可能是导致此问题的原因。我正在研究使用(b)。当我查看配置文件时,似乎我们需要为每个输入队列(收件人类型)拥有一个超时管理器。 - jopa
如果您选择使用专用的超时管理器,您只需要一个 - 如果您没有配置超时管理器的端点地址,则默认值为rebus.timeout(假定为本地)。 - mookid8000
您可以通过使用配置部分来配置超时管理器地址。 - mookid8000
再次感谢。我已经报告了一个关于外部超时管理器的错误。希望您有时间来查看它。https://github.com/rebus-org/Rebus/issues/241 - jopa
这不太像是一个错误,更像是一种有意的遗漏。我已经在我的评论中详细阐述了(https://github.com/rebus-org/Rebus/issues/242#issuecomment-46645031)。 - mookid8000
SqlServerTimeoutStorage.cs的链接已经失效。 - Jose Rodriguez

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接