避免 JMS/ActiveMQ 上重复的消息

22
有没有办法在ActiveMQ服务器上定义的队列中抑制重复的消息?
我尝试手动定义JMSMessageID,(message.setJMSMessageID(“uniqueid”)),但服务器忽略了此修改,并传递具有内置生成的JMSMessageID的消息。
按规范,我没有找到关于如何去重复消息的参考。
在HornetQ中,为了解决这个问题,我们需要在消息定义上声明HQ特定属性org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。
例如:
Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

有人知道是否有类似的解决方案适用于ActiveMQ吗?

6个回答

7

2
我对这种方法能否解决我的问题存在疑问。在队列中,我需要仅保留一个带有相同JMSMessageID的消息实例,仅在此实例期间有效,并将其作为Set运行。在最新的idem元素从队列中移除之后,我希望能够放置具有相同JMSMessageID的其他消息。我需要实现并进行测试。但是,基于EAI书中描述的幂等性,我认为该概念与我的需求不符。但是,所提出的解决方案很好。我会进一步研究并在此处评论我的结果。谢谢 - Andre Pastore

5
我怀疑ActiveMQ是否原生支持它,但实现幂等消费者应该很容易。一种方法是在生产者端为每个消息添加一个唯一标识符,现在在使用存储(数据库、缓存等)的消费者端,可以进行检查以查看是否之前接收过该消息,并基于该检查继续处理。
我看到之前有一个类似的stackoverflow问题-Apache ActiveMQ 5.3-如何配置队列以拒绝重复消息? ,这也可能有所帮助。

1
由于消费者本身可以是多线程的,为了确定它是否是重复的,必须实现分布式/内存锁定。对吧? - user1401472

4

现在ActiveMQ传输中支持去重复消息的操作。请查看连接配置指南中的配置值auditDepthauditMaximumProducerNumber


4
你实际上如何配置这些参数以避免重复? - Thomas
@Thomas 我不确定你在问什么。是关于如何在ActiveMQ中应用配置的一般方法?还是关于这些特定字段要使用哪些值? - Chris Pitman
只是从参数的描述来看,对我来说不太清楚。例如,auditDepth这个参数,它的值是指筛选重复消息的数量还是字节数量?关于auditMaximumProducerNumber,这是否意味着有限制的生产者将被筛选?顺便问一下,如果两个不同的订阅者发布了相同内容的消息,那么这条消息是否会被认为是重复的? - Thomas
1
@Chris 如果我理解正确,这些参数可以保证在最多64个生产者上的2048条消息中检测到重复项。但是ActiveMQ如何确定什么是重复项呢?如果是通过JMSMessageID来判断的话,那我们又回到了起点,因为我们无法设置它。 - Antares42

3

有一种方法可以让ActiveMQ根据JMS属性过滤重复消息。它涉及编写一个ActiveMQ插件。将重复消息发送到死信队列的基本代理过滤器如下:

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter {
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) {
        super(next);
        this.messagePropertyName = messagePropertyName;
    }

    public boolean hasDuplicate(String propertyValue){
        switchValue = propertyValue;
        return switchValue;
    }

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message) { 
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                super.send(producerExchange, msg);
            }
            else {
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
            } 
        }
    }  
}

这个插件是如何决定使用哪个属性来进一步过滤重复消息的?如果能解释一下使用案例,将非常有帮助来集成这样的插件。感谢您提前的回答。 - Hayra

0

你尝试过设置 jms.checkForDuplicates=true 吗?


0

看起来问题中建议的方法也适用于ActiveMQ(2016/12)。请参阅activemq-artemis指南。这需要生产者将特定属性设置到消息中。

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

然而,包含该属性的类是不同的:org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID,而属性值为_AMQ_DUPL_ID

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接