使用Camel AMQP连接的Camel RabbitMQ连接

5

我正在尝试使用camel-amqp(版本2.17)组件在我的camel路由中连接rabbitmq。

我已按以下配置进行设置:

@Bean
    CachingConnectionFactory jmsCachingConnectionFactory(){

        JmsConnectionFactory pool = new JmsConnectionFactory();
        pool.setRemoteURI("amqp://127.0.0.1:5672");
        pool.setUsername("guest");
        pool.setPassword("guest");

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(pool);
        return cachingConnectionFactory;
    }

    @Bean
    JmsConfiguration jmsConfig(){

        JmsConfiguration configuration = new JmsConfiguration();
        configuration.setConnectionFactory(jmsCachingConnectionFactory());
       // configuration.setCacheLevelName("CACHE_CONSUMER");
        return configuration;
    }

    @Bean
    AMQPComponent amqp(){
        AMQPComponent component = new AMQPComponent();
        component.setConfiguration(jmsConfig());
        return  component;
    }

我收到的错误信息是:

javax.jms.JMSException: 远程主机强制关闭了一个现有的连接 at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:66) ~[qpid-jms-client-0.8.0.jar:0.8.0]

在我的rabbitmq日志中,我看到了下面的消息,但我无法理解:

*

** Reason for termination == 
** {function_clause,
       [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
            [{list,
                 [{symbol,<<"amqp:accepted:list">>},
                  {symbol,<<"amqp:rejected:list">>},
                  {symbol,<<"amqp:released:list">>},
                  {symbol,<<"amqp:modified:list">>}]}],
            [{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
        {rabbit_amqp1_0_link_util,outcomes,1,
            [{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
        {rabbit_amqp1_0_outgoing_link,attach,3,
            [{file,"src/rabbit_amqp1_0_outgoing_link.erl"},{line,41}]},
        {rabbit_amqp1_0_session_process,with_disposable_channel,2,
            [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,377}]},
        {rabbit_amqp1_0_session_process,handle_control,2,
            [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,197}]},
        {rabbit_amqp1_0_session_process,handle_cast,2,
            [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,134}]},
        {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1049}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}
=ERROR REPORT==== 8-Jul-2016::17:09:27 ===
closing AMQP connection <0.29082.0> (127.0.0.1:55479 -> 127.0.0.1:5672):
{handshake_error,running,<0.29104.0>,
    {{symbol,<<"amqp:internal-error">>},
     "Session error: ~p~n~p~n",
     [function_clause,
      [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
           [{list,
                [{symbol,<<"amqp:accepted:list">>},
                 {symbol,<<"amqp:rejected:list">>},
                 {symbol,<<"amqp:released:list">>},
                 {symbol,<<"amqp:modified:list">>}]}],
           [{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
       {rabbit_amqp1_0_link_util,outcomes,1,
           [{file,"src/rabbit_amqp1_0_link_util.erl"},{line,49}]},
       {rabbit_amqp1_0_outgoing_link,attach,3,
           [{file,"src/rabbit_amqp1_0_outgoing_link.erl"},{line,41}]},
       {rabbit_amqp1_0_session_process,with_disposable_channel,2,
           [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,377}]},
       {rabbit_amqp1_0_session_process,handle_control,2,
           [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,197}]},
       {rabbit_amqp1_0_session_process,handle_cast,2,
           [{file,"src/rabbit_amqp1_0_session_process.erl"},{line,134}]},
       {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1049}]},
       {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]]}}

我在RabbitMQ中启用了amqp_1_0插件。有人可以帮我解决这个问题吗?


有一个camel-rabbitmq组件,为什么不使用它呢? - Souciance Eqdam Rashti
Camel-rabbitmq 运行良好,但唯一的问题是似乎缺少事务支持。我没有看到在组件中设置我的 tx 管理器的方法。这就是我开始研究 amqp 的原因,因为它使用具有 tx 支持的 jms 组件。 - VGaur
RabbitMQ不鼓励使用AMQP事务,因为它们非常慢 - 开发者自己的话是,它们会使吞吐量降低250倍!我知道这并不能解决你的问题,但我的建议是尝试在关闭autoAck选项并启用发布者确认的情况下使用RabbitMQ。从2.17.0版本开始,Camel支持确认功能,请参阅RabbitMQ组件文档了解详情。 - Miloš Milivojević
1个回答

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接