Spring Integration:延迟后消息被释放两次

4

我正在使用以下的XML片段:

<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
                                  queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
    <int:delayer id="commandDelayer" default-delay="30000"/>
    <int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>

<int:payload-type-router input-channel="commands">
....
....

它正在执行以下任务:
  1. 从名为“commands”的RabbitMQ队列中消耗消息。
  2. 将消息执行延迟30秒。
  3. 在指定的延迟后继续执行消息。
如果应用程序在上述代码启动之前已经存在于命令队列中,则在启动时,应用程序会在单独的线程中两次执行该消息。
我认为我知道为什么会发生这种情况。
Spring会重新安排存储在DelayHandler消息存储中的消息,一旦应用程序上下文完全初始化。请参考以下代码片段:DelayHandler.java
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!this.initialized.getAndSet(true)) {
        this.reschedulePersistedMessages();
    }
}

如果消息在应用程序启动之前已经存在于RabbitMQ队列中,在Spring上下文初始化期间,消息将从队列中获取并添加到DelayHandler的消息存储中。一旦完成上下文初始化,并且在此期间未释放消息,则上述代码段会重新安排相同的消息。

现在,当两个独立的线程执行同一条消息时,如果一个线程已经执行了该消息,则应该从消息存储中删除该消息,而另一个线程不应继续执行。

在线程执行时,DelayHandler.java中下面的代码片段允许第二个线程释放重复的消息,导致同一消息重复执行,因为消息存储是SimpleMessageStore的实例,并且没有进一步的检查来停止执行。

private void doReleaseMessage(Message<?> message) {
    if (this.messageStore instanceof SimpleMessageStore
            || ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
        this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
        this.handleMessageInternal(message);
    }
    else {
        if (logger.isDebugEnabled()) {
            logger.debug("No message in the Message Store to release: " + message +
                    ". Likely another instance has already released it.");
        }
    }
}

这是Spring Integration中的一个bug吗?

1个回答

3

哦,好的!

这是一个非常不错的bug。

感谢您指出这个问题!

请提出JIRA issue,我们将在下一版本中讨论此事。

我可以解释一下发生了什么。

所有Spring Integration都从Lifecycle.start()开始工作。在您的情况下,<int-amqp:inbound-channel-adapter>从RabbitMQ接收消息并将其发送到集成流中 ,然后它们会被delayed

只有在应用上下文触发ContextRefreshedEvent时,DelayHandler才会捕获该事件,并从messageStore中获取所有消息,并像您注意到的那样进行reschedule

因此,是的,我们可能会为同一条消息拥有两个定时任务。

有趣的是,这仅适用于SimpleMessageStore,因为它没有removeMessage功能来删除存储在groups中的消息。

我认为有几种变通方法:

  1. 延迟<int-amqp:inbound-channel-adapter>start,例如,从<inbound-channel-adapter>处理相同的ContextRefreshedEvent,并将@amqpAdapter.start()命令消息发送到<control-bus>

  2. 另一个选择是自Spring Integration 4.1起,它的名字是Idempotent Receiver。使用此工具,您可以丢弃重复的消息,我猜idempotentKey恰好是messageId。清洁的Idempotent Receiver模式!

  3. 还有一种选择,就是使用persistentMessageStore,在那里我们确实可以依赖于removeMessage操作。

关于此事的JIRA票:https://jira.spring.io/browse/INT-3560


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接