JMS - 从单个消费者到多个消费者

25

我有一个JMS客户端,用于产生消息并将其通过JMS队列发送给唯一的消费者。

我想要的是多个消费者获取这些消息。我脑海中首先想到的是将队列转换为主题,这样当前和新的消费者都可以订阅并获取相同的消息。

这显然需要修改当前客户端代码中的生产者和消费者方面的内容。

我还想看看其他选项,比如创建第二个队列,这样我就不必修改现有的消费者。我认为这种方法有优点,例如(如果我错了,请纠正我)在两个不同的队列之间平衡负载,而不是一个队列,这可能对性能有积极的影响。

我希望得到关于这些选项以及您可能看到的优缺点的建议。非常感谢任何反馈。

2个回答

52

根据您的陈述,您有几个选择。

如果要将其转换为主题以获得相同的效果,则需要将消费者转换为持久化消费者。队列提供的一件事是,如果您的消费者不活动,它可以保持持久性。这将取决于您使用的MQ系统。

如果您想坚持使用队列,那么您将为每个消费者创建一个队列,并创建一个分发器来侦听原始队列。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

主题的优点

  • 更容易动态添加新的消费者。所有消费者都将获得新消息,而不需要任何工作。
  • 您可以创建轮询主题,使得Consumer_1会收到一条消息,然后是Consumer_2,再然后是Consumer_3。
  • 消费者可以接收新的消息,而不必查询队列,使它们具有响应性。

主题的缺点

  • 除非您的代理支持此配置,否则消息不是持久性的。如果消费者离线并回来,则可能会错过消息,除非设置了持久性消费者。
  • 难以让Consumer_1和Consumer_2接收消息,但不让Consumer_3接收。使用调度程序和队列,调度程序无法将消息放入Consumer_3的队列中。

队列的优点

  • 消息在消费者删除它们之前是持久的。
  • 调度程序可以通过不将消息放入相应的消费者队列中来筛选哪些消费者接收哪些消息。尽管通过筛选器也可以通过主题实现。

队列的缺点

  • 需要创建额外的队列来支持多个消费者。在动态环境中,这将不是高效的。

在开发消息系统时,我更喜欢使用主题,因为它给了我最大的控制权,但是既然您已经在使用队列,那么要实现主题,需要更改系统工作方式。

多消费者队列系统的设计和实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

来源

请注意,您还需要处理其他问题,例如异常处理、重新连接到连接和队列(如果失去连接),等等。这只是为了让您了解如何完成我所描述的内容。

在实际系统中,我可能不会在第一个异常处退出。我会允许系统尽可能地继续运行并记录错误。就目前而言,在此代码中,如果将消息放入单个消费者队列失败,整个调度程序将停止。

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();

那是一个很好的答案。我正在使用JBoss的MOM实现,也就是HornetQ。 - Gonzalo Garcia Lasurtegui
@Anonimo 最后一次我检查JBoss绝对遵循JMS规范。这在过去给我带来了一些挫败感,因为我动态创建主题,而JMS规范没有考虑到这一点。其他像ActiveMQ这样的工具允许您动态创建主题,并且只需要更改JBoss中的1行代码即可实现相同的功能。 - Andrew T Finnell
谢谢安德鲁。您能否详细说明一下使用多个队列的想法?根据您的解释,生产者代码不会改变,但是不确定调度程序位于何处以及在技术术语中如何表示。 - Gonzalo Garcia Lasurtegui
@Anomimo 是的,我很快就会发布它。 - Andrew T Finnell

5
您可能不需要修改代码;这取决于您编写的方式。
例如,如果您的代码使用MessageProducer发送消息而不是QueueSender,那么它将适用于主题和队列。同样,如果您使用MessageConsumer而不是QueueReceiver,也是如此。
基本上,在JMS应用程序中,使用非特定接口与JMS系统进行交互,如MessageProducerMessageConsumerDestination等,是一个好的实践。如果是这种情况,那么只需进行“简单”的配置即可。

那是一个不错的选择。不幸的是,我们正在使用像QueueSender这样的特定接口。如果我们重构代码的话,我一定会记住这件事。 - Gonzalo Garcia Lasurtegui

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接