使用核心API消费后,HornetQ消息仍然留存在队列中。

7

我是HornetQ的新手,请多多包涵。首先,让我告诉您我的要求:

我需要一种消息队列中间件,可以在不同进程之间传递大约1K大小的消息,具有低延迟和持久性(即它应该能够在系统崩溃后恢复)。我将有多个进程向同一队列写入,同样地,多个进程也将从同一队列读取。

因此,我选择了HornetQ,因为它在带持久性的消息传递方面评价最好。

我目前使用的是Hornetq v2.2.2Final作为独立服务器
我能够成功使用核心API (ClientSession)创建持久/非持久队列,并成功将消息发布到队列(ClientProducer)中。
同样,我能够使用核心API (ClientConsumer)从队列中读取消息。

问题出现在客户端读取消息后,消息仍然保留在队列中,即队列中的消息数量保持不变。也许我理解有误,但我认为一旦消息被消费(读取+确认),它就会从队列中删除。但在我的情况下,这种情况并没有发生,同一条消息一遍又一遍地被读取。

此外,我还想说,我已经尝试过使用非持久队列和非持久消息解决问题,但问题依然存在。

我正在使用的生产者代码:

public class HQProducer implements Runnable {

    private ClientProducer producer;
    private boolean killme;
    private ClientSession session;
    private boolean durableMsg;

    public HQProducer(String host, int port, String address, String queueName,
            boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
        this.durableMsg = durableMsg;
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            if (queueExists(queueName)) {
                if (deleteQ) {
                    System.out.println("Deleting existing queue :: " + queueName);
                    session.deleteQueue(queueName);
                    System.out.println("Creating queue :: " + queueName);
                    session.createQueue(address, queueName, true);
                }
            } else {
                System.out.println("Creating new  queue :: " + queueName);
                session.createQueue(address, queueName, durable);
            }
            producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

            killme = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killme) {
            try {
                ClientMessage message = session.createMessage(durableMsg);

                message.getBodyBuffer().writeString("Hello world");

                producer.send(message);
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("Producer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killme) {
        this.killme = killme;
    }

    private boolean queueExists(String qname) {
        boolean res = false;
        try {
            //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
            QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
            if (queueQuery.isExists()) {
                res = true;
            }
        } catch (HornetQException ex) {
            res = false;
        }
        return res;
    }
}

同时消费者的代码是:
public class HQConsumer implements Runnable {

    private ClientSession session;
    private ClientConsumer consumer;
    private boolean killMe;

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            session.start();

            consumer = session.createConsumer(queueName, "",0,-1,browseOnly);

            killMe = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killMe) {
            try {
                ClientMessage msgReceived = consumer.receive();
                msgReceived.acknowledge();
                //System.out.println("message = " + msgReceived.getBodyBuffer().readString());
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("ConSumer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killMe) {
        this.killMe = killMe;
    }
}

HornetQ服务器配置:

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

   <paging-directory>${data.dir:../data}/paging</paging-directory>

   <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

   <journal-directory>${data.dir:../data}/journal</journal-directory>

   <journal-min-files>10</journal-min-files>

   <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

   <connectors>
      <connector name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </connector>

      <connector name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
      </connector>
   </connectors>

   <acceptors>
      <acceptor name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </acceptor>

      <acceptor name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
         <param key="direct-deliver" value="false"/>
      </acceptor>
   </acceptors>

   <security-settings>
      <security-setting match="#">
         <permission type="createNonDurableQueue" roles="guest"/>
         <permission type="deleteNonDurableQueue" roles="guest"/>
         <permission type="createDurableQueue" roles="guest"/>
         <permission type="deleteDurableQueue" roles="guest"/>
         <permission type="consume" roles="guest"/>
         <permission type="send" roles="guest"/>
      </security-setting>
   </security-settings>

   <address-settings>
      <!--default for catch all-->
      <address-setting match="#">
         <dead-letter-address>jms.queue.DLQ</dead-letter-address>
         <expiry-address>jms.queue.ExpiryQueue</expiry-address>
         <redelivery-delay>0</redelivery-delay>
         <max-size-bytes>10485760</max-size-bytes>       
         <message-counter-history-day-limit>10</message-counter-history-day-limit>
         <address-full-policy>BLOCK</address-full-policy>
      </address-setting>
   </address-settings>

</configuration>

根据此文档,在处理完消息后需要确认收到消息,你是否已经这样做了? - Asad Rasheed
2个回答

14

使用hornetq核心API,您需要显式确认一条消息。我没有看到这在您的测试中发生。

如果您不确认消息,则这就是为令您的消息被阻塞的原因。我需要看到您完整的示例才能给您一个完整的答案。

另外:您应该使用以下方式定义createSession:createSession(true, true, 0)

核心API具有批量确认(ACKs)选项。您没有使用事务会话,因此直到达到serverLocator配置的ackBatchSize,才会将确认发送到服务器。有了这个设置,只要您调用消息的acknowledge(),任何确认将立即发送到服务器。

您当前使用的选项等效于具有某些DUPS_SIZE的JMS DUPS_OK。

(在与您进行一些迭代后编辑了我的初始回答)


1
ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); 我正在确认这段代码。 - Vivek Mehra
核心API有一个批量ACK的选项。您没有使用事务会话,因此在达到服务器定位器配置的ackBatchSize之前,不会将ACK发送到服务器。您应该使用以下方式定义createSession:createSession(true, true, 0);这样一来,只要您调用message的acknowledge()方法,任何ACK都会立即发送到服务器。 - Clebert Suconic
1
你没有回复这个帖子,所以我假设你已经解决了你的问题? - Clebert Suconic
我曾经遇到过同样的问题,这个方法似乎可以解决它。我显然没有理解createSession(boolean autoCommitSends, boolean autoCommitAcks, int ackBatchSize)的Java文档。autoCommitSends和autoCommitAcks与确认部分无关,只与提交有关,是这样吗? - Alper Akture
@AlperAkture 不确定你的问题是否得到了回答,但是考虑到 Acks 在名称中,至少其中一个与确认有关。 - dimwittedanimal
@dimwittedanimal 不确定,我记不得那么久以前的事情了 :-/ - Alper Akture

2

设置 ackbatchsize 帮助我解决了这个问题。感谢你的帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接