ActiveMQ-发布者订阅者实例初探

21

有两个程序:订阅者和发布者... 订阅者能够将消息放入主题中并成功发送。 当我在浏览器上检查ActiveMQ服务器时,它显示已排队的1条消息。但是当我运行消费者代码时,它没有接收到该消息。

这是生产者代码:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class producer {

    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageProducer producer = session.createProducer(topic);

        // We will send a small text message saying 'Hello'

        TextMessage message = session.createTextMessage();

        message.setText("HELLO JMS WORLD");
        // Here we are sending the message!
        producer.send(message);
        System.out.println("Sent message '" + message.getText() + "'");

        connection.close();
    }
}

运行这段代码后,控制台的输出为:

26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Sent message 'HELLO JMS WORLD'

这里是消费者代码:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageConsumer consumer = session.createConsumer(topic);

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        connection.close();

    }
}    

运行这段代码后没有显示任何内容。 有人可以帮我克服这个问题吗?


1
你可能关闭了连接太快。在你的消费者开始消费之前,连接就被关闭了,主方法也结束了! - Ravi Bhatt
4个回答

17

你的问题是消费者正在运行,然后立即关闭。

尝试将以下内容添加到消费者中:

    consumer.setMessageListener(listner);

    try {
        System.in.read();
    } catch (IOException e) {
        e.printStackTrace();
    }

    connection.close();

这将会等待你按下一个键后再停止。

其他需要考虑的事项:

  • 使用finally块进行关闭操作
  • Java命名规范鼓励在类的第一个字母使用大写字母

不好意思,您提到的代码也无法运行 :( 请帮帮我... 我在项目中卡住了! - Vijet Badigannavar

13
除了应用程序关闭得太快之外,主要问题是您正在向一个主题发送消息。主题不会保留消息,因此如果您运行生成消息的应用程序,然后运行消费者,消费者将无法接收任何信息,因为它在发送消息时没有订阅该主题。如果您解决了关闭问题并在一个终端中运行消费者,然后运行生产者,您应该能看到消费者接收到的消息。如果您需要消息保留,则需要使用队列来保留消息,直到有人消费它。

4
抱歉,朋友们,程序错误已修复。 我先执行了发布者模块,然后才执行订阅者模块...... 但是应该先执行订阅者模块,再执行发布者模块...... 感谢建议 :) - Vijet Badigannavar

3

你的生产者类是正确的,它可以顺利运行。

但是,你的消费者类是不正确的,你需要对其进行修改。

  • 首先,在创建连接对象后添加setClientID("any_string_value")

    例如:Connection connection = connectionFactory.createConnection(); // 需要设置setClientID值,任何字符串值都可以 connection.setClientID("12345");

  • 其次,使用createDurableSubscriber()方法代替createConsumer()方法来通过主题传输消息。

    MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");

这是修改后的消费者类:

package mq.test;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();

        // need to setClientID value, any string value you wish
        connection.setClientID("12345");

        try{
        connection.start();
        }catch(Exception e){
            System.err.println("NOT CONNECTED!!!");
        }
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("test_data");

        //need to use createDurableSubscriber() method instead of createConsumer() for topic
        // MessageConsumer consumer = session.createConsumer(topic);
        MessageConsumer consumer = session.createDurableSubscriber(topic,
                "SUB1234");

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        //connection.close();

    }
}

现在,你的代码将会成功运行。

2

以下是需要翻译的内容:

  • 使用队列而不是主题进行工作。当没有消费者可用时,主题中的消息将被丢弃,它们是非持久性的。
  • 在设置消息监听器后添加connection.start()。当所有消费者/生产者都正确设置时,应启动连接。
  • 在关闭连接之前等待一段时间。

主题可能是您最重要的故障源。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接