Apache Camel,RabbitMQ如何发送消息/对象

8

我希望有人能够提供一些关于此问题的帮助。

我正在使用Camel RabbitMQ进行测试,尝试向队列发送消息,并在RabbitMQ界面中显示并读取它。

但我无法使其正常工作。

我认为起作用的是,在RabbitMQ管理界面的Exchange选项卡中创建了一个新的Exchange。 在我的Java代码中,我将消息发送到该Exchange。当代码执行时,我可以在Web界面中看到一个峰值,显示已收到某些内容,但我无法看到已接收的内容。当我尝试读取时,无法读取并出现以下错误:

< in route: Route(route2)[[From[rabbitmq:// 192.168.59.103:5672 / rt ...因为Route route2没有输出处理器。您需要添加路线的输出,例如to(“log:foo”)。

是否有人能够提供一个实际的例子,告诉我如何发送消息,查看它并读取它?任何展示此过程的教程也将不胜感激。

谢谢

=================第二部分

我现在遇到的错误是:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'rhSearchExchange' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 47 more
我有以下设置: 我遇到了以下错误,我相信我在URI方面做错了一些事情,并且我必须定义一些我缺少的额外参数 我的交换机是直接类型 我的队列是持久类型 我的URI是: rabbitmq://192.168.59.105:5672/rhSearchExchange?username=guest&password=guest&routingKey=rhSearchQueue 对此有什么建议吗? 谢谢。

有关此事的任何建议? - cpu2007
我一直在寻找类似的教程... 我可以将消息发布到交换机,但我无法从camel中消费它们。然而,对于您的错误,我认为问题在于您没有将消息路由到任何地方。例如,我相信您的配置是 from("rabbitmq:localhost..."); 但它应该是 from("rabbitmq:localhost:...").to("foo:bar")foo:bar 可以是类似于 mock:result 的东西。 - Jeff
谢谢您的回复。我明白我必须使用“to”来流式传输/保存它。我已经更新了我的线程,以显示我现在遇到的错误。如果有人对如何解决这个问题有任何建议,请告诉我。 - cpu2007
请从第二部分开始阅读,因为那是我现在遇到的错误。 - cpu2007
2个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
10

昨天我解决了这个问题,我遇到了与你类似(或至少相似)的问题。

在RabbitMQ URI中的选项必须与您创建交换器时的选项完全匹配。例如,在我的配置中,我有一个名为tasks的直接类型的持久化交换器,并未配置autodelete选项。请注意,rabbitmq camel组件中autodelete选项的默认值为true。此外,我想用路由键camel获取消息。这意味着我的 rabbitmq URI 需要像这样:

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel

此外,我想从一个现有队列中读取,名为task_queue,而不是让rabbitmq骆驼组件声明自己的队列。因此,我还需要添加一个额外的查询参数,所以我的rabbitmq URI是

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue

这个配置对我很有效。下面是一些Java代码片段,展示了如何配置交换机和队列、发送消息以及我的Camel路由器配置。

交换机和队列配置:

rabbitConnFactory = new ConnectionFactory();
rabbitConnFactory.setHost("localhost");
final Connection conn = rabbitConnFactory.newConnection();
final Channel channel = conn.createChannel();

// declare a direct, durable, non autodelete exchange named 'tasks'    
channel.exchangeDeclare("tasks", "direct", true); 
// declare a durable, non exclusive, non autodelete queue named 'task_queue'
channel.queueDeclare("task_queue", true, false, false, null); 
// bind 'task_queue' to the 'tasks' exchange with the routing key 'camel'
channel.queueBind("task_queue", "tasks", "camel"); 

发送消息:

channel.basicPublish("tasks", "camel", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello, world!".getBytes());

Camel 路由:

@Override
public void configure() throws Exception {
    from("rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue")
        .to("mock:result");
}

我希望这可以帮到你!


非常感谢 llawj,它起作用了。我添加了autoDelete=false,队列的名称以及已经存在的routingkey。因此,基本上问题在于获得正确的URI。我遇到的一个问题是,当它读取队列时,它会读取其中所有的消息?我尝试在URI中将prefetchEnabled=true和prefetchCount=1设置为true,但这会导致错误。有没有关于如何获取一条消息的建议?谢谢! - cpu2007
说实话,我不太确定。我刚开始使用Camel / RabbitMQ,并且还没有需要限制我读取的消息数量。 - Jeff
我会看看是否有办法做到。谢谢你的帮助。 - cpu2007
从Camel 2.14开始似乎不需要执行手动交换、队列和绑定声明。请查看http://camel.apache.org/rabbitmq.html。有一个“declare”选项,默认为true。 - Dr.Khu
1
@cpu2007,您看到的是什么错误?除了prefetchCount = 1之外,您是否尝试过将autoAck设置为false?这样,broker在您处理当前消息之前不会再发送更多消息,直到您发送ack。 - Art

5

因为这是谷歌上关于rabbitmq/camel集成的热门搜索结果,所以我觉得有必要对这个主题再做一些补充。对我来说,缺乏简单的camel示例令人惊讶。

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Test;

public class CamelTests {
    CamelContext context;
    ProducerTemplate producer;
    ConsumerTemplate consumer;
    Endpoint endpoint;

    @Test
    public void camelRabbitMq() throws Exception {
        context = new DefaultCamelContext();

        context.start();

        endpoint = context.getEndpoint("rabbitmq://192.168.56.11:5672/tasks?username=benchmark&password=benchmark&autoDelete=false&routingKey=camel&queue=task_queue");

        producer = context.createProducerTemplate();

        producer.setDefaultEndpoint(endpoint);
        producer.sendBody("one");
        producer.sendBody("two");
        producer.sendBody("three");
        producer.sendBody("four");
        producer.sendBody("done");

        consumer = context.createConsumerTemplate();
        String body = null;
        while (!"done".equals(body)) {
            Exchange receive = consumer.receive(endpoint);
            body = receive.getIn().getBody(String.class);
            System.out.println(body);
        }

        context.stop();

    }

}

Producer-/ConsumerTemplates' JavaDoc 中:「重要提示: 确保在您使用完模板后调用 org.apache.camel.ProducerTemplate.stop() | ConsumerTemplate.stop() 以清理任何资源。」 - Gerold Broser

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,