我们正在使用Rebus作为与Sql Server配合的队列系统。我们有几个不同类型消息的接收者。每个消息可以由某些特定类型的多个工作者处理。
每条消息应该只被一个工作者处理/处理(第一个拉取它的工作者)。如果某个工作者由于某种原因无法完成任务,则会使用超时服务将其推迟。
如果我理解正确,它将成为一个TimeoutRequest并放入超时表中。当重新运行时,它变成一个TimeoutReply,然后再次作为原始消息重新引入队列。
我们遇到的问题是,当它变成TimeoutReply时,所有工作者都会拿起它并创建原始消息。当超时时,一个原始消息会变成多个消息(数量与工作者一样多)。
我们的Rebus设置如下:
“服务器端”:
"Worker side":
“工作端”:
每条消息应该只被一个工作者处理/处理(第一个拉取它的工作者)。如果某个工作者由于某种原因无法完成任务,则会使用超时服务将其推迟。
如果我理解正确,它将成为一个TimeoutRequest并放入超时表中。当重新运行时,它变成一个TimeoutReply,然后再次作为原始消息重新引入队列。
我们遇到的问题是,当它变成TimeoutReply时,所有工作者都会拿起它并创建原始消息。当超时时,一个原始消息会变成多个消息(数量与工作者一样多)。
我们的Rebus设置如下:
“服务器端”:
var adapter = new BuiltinContainerAdapter();
Configure.With(adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
.CreateBus()
.Start();
return adapter;
"Worker side":
“工作端”:
_adapter = new BuiltinContainerAdapter();
Configure.With(_adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
.EnsureTableIsCreated())
.Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
.Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
.Behavior(x => x.SetMaxRetriesFor<Exception>(0))
.Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
.CreateBus().Start(numberOfWorkers);
非常感谢您的帮助解决问题或提供理解!
rebus.timeout
(假定为本地)。 - mookid8000