ActiveMQ和嵌入式代理

12

编辑:重新表述问题:

我想在我的服务器和客户端应用程序之间使用ActiveMQ作为信使服务。

我正在尝试设置一个嵌入式代理(即不是一个单独的进程),以处理为我的客户生成的消息。这个队列是持久化的。

代理的初始化如下:

BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();

经过一番调试之后,我最终得到的服务器部分代码是:

public static class HelloWorldProducer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
            Connection connection = connectionFactory.createConnection(); 
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

客户端非常相似,看起来像这样:

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
    public void run() {
        try {
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
            Connection connection = connectionFactory.createConnection(); // exception happens here...
            connection.start();
            connection.setExceptionListener(this);
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("*****Received: " + text);
            } else {
                System.out.println("*****Received obj: " + message);
            }
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

主方法只是启动每个线程以开始生产/接收消息。

...但我在每个线程的启动时遇到了以下问题:

2013-01-24 07:54:31,271 INFO  [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO  [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
    java.rmi.server.ExportException: internal error: ObjID already in use
    at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
    at sun.rmi.transport.Transport.exportObject(Transport.java:92)
    at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
    at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
    at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
        <snip....>

看起来消息已经成功地生产和消费了(我之前发布的其他问题已经解决),但上述异常让我感到担忧。

编辑:在代理器关闭期间,我现在也收到了以下提示:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
    java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
    at java.lang.Thread.run(Thread.java:722)

你应该包含所有的代理创建代码,这样我们才能看到你配置了什么。如果不需要JMX,可以通过broker.setUseJmx(false)来禁用它。 - Tim Bish
澄清一下,显示的消息是DEBUG级别的。这不一定是一个错误。它可能只是提供信息。您是否在实际生产/消费消息时遇到错误?问题在这一点上并不清楚。 - cmonkey
我已经完全重新表述了问题。本质上,我在问三个子问题。(1) 异常,(2) 丢失的消息和(3) 持久性。感谢您查看我的问题。 - Jaco Van Niekerk
看起来你正在使用非常老的ActiveMQ版本,因为它默认使用AMQ持久性适配器,请建议您切换到5.8.0版本。 - Tim Bish
你想在服务器内运行嵌入式代理(即ActiveMQ本身),还是只需要一个JMS生产者/消费者?如果是前者,请提供有关如何尝试运行该代理的详细信息。 - SirRichie
@TimBish:我已经升级到5.7.0了。我在mvn仓库中没有看到5.8.0版本。看起来一切都正常,谢谢。 - Jaco Van Niekerk
2个回答

15

你可以通过多种方式将经纪人嵌入到代码中,其中许多方式都在这里有文档记录。由于你使用的默认值为现在已弃用的AMQ存储而不是更新的KahaDB存储,所以你可能需要尝试升级您的版本。因为客户端线程使用不同的连接工厂,它们可能会争相创建虚拟机经纪人,所以你可能会遇到问题。如果你在生产者上设置create=false选项并确保消费者线程在此之后启动,可能会解决这个问题;或者你可以事先创建VM经纪人,并将create=false添加到两个线程中,这可能会奏效。

BrokerService broker = new BrokerService();
// configure the broker
broker.setBrokerName("localhost");
broker.setUseJmx(false);
broker.start();

然后在客户端代码中,只需通过此连接工厂配置进行附加。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");

谢谢,我已经这样做了。然而,我仍然担心上面的异常。你知道它是否可以安全地忽略吗? - Jaco Van Niekerk
1
异常看起来是在告诉你,客户端连接失败了,因为在客户端仍然连接的时候你关闭了代理。如果你正在关闭应用程序,这不是一个很大的问题。从问题中看来,你的客户端使用的是TCP而不是VM连接工厂,尽管这并不完全清楚。 - Tim Bish
是的...那确实是问题所在,可以安全地忽略异常。为了避免这种情况,客户端需要在服务器(包括代理)关闭之前关闭。 - Jaco Van Niekerk

4
当我运行你的代码时,我遇到了以下异常:
javax.jms.JMSException: Could not connect to broker URL: tcp://localhost. 
Reason java.lang.IllegalArgumentException: port out of range:-1

您的代理在运行并侦听61616端口,因此任何试图连接到代理的客户端都需要在其URL中指定该端口。

客户端代码试图连接到本地主机,但没有提到它必须连接的端口。 生产者和消费者代码都需要修复。

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");

To

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

修复了端口问题后,我能够运行你的代码。

谢谢Satish。我已经成功解决了这个问题。抱歉,我应该关闭这个问题。 - Jaco Van Niekerk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接