Spring AMQP转换器问题使用Rabbit Listener

22

我觉得这里可能有些问题...我正在尝试创建一个简单的rabbit listener,可以接受自定义对象作为消息类型。现在根据文档所说:

在1.6版本之前,必须在消息头中提供类型信息以将JSON转换,或者需要一个自定义的ClassMapper。从1.6版本开始,如果没有类型信息头,则可以从目标方法参数中推断类型。

我手动将消息放入队列,使用RabbitMQ面板得到错误消息:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, amqp_deliveryTag=1, amqp_consumerQueue=customer, amqp_redelivered=false, id=81e8a562-71aa-b430-df03-f60e6a37c5dc, amqp_consumerTag=amq.ctag-LQARUDrR6sUcn7FqAKKVDA, timestamp=1485635555742}]

我的配置:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new    CachingConnectionFactory("localhost");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test1234");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

另外一个问题是,这个异常消息没有被放回队列中。

我正在使用 Spring Boot 1.4,它带有 AMQP 1.6.1。

Edit1:我已经像上面那样添加了 Jackson 转换器(在 Spring Boot 中可能不需要),并在 RabbitMQ 管理界面上设置了内容类型,但仍然收到以下错误,正如您在上面看到的,我还没有配置任何监听器容器。

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.Customer] for GenericMessage [payload=byte[21], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=customer, content_type=application/json, amqp_deliveryTag=3, amqp_consumerQueue=customer, amqp_redelivered=false, id=7f84d49d-037a-9ea3-e936-ed5552d9f535, amqp_consumerTag=amq.ctag-YSemzbIW6Q8JGYUS70WWtA, timestamp=1485643437271}]
2个回答

32
如果您使用的是boot,您可以在配置中简单地添加一个Jackson2JsonMessageConverter @Bean, 它会自动连接到listener(只要它是唯一的转换器)。如果您正在使用管理控制台发送消息,则需要将content_type属性设置为application/json

默认情况下,转换错误被视为致命错误,因为通常没有原因重试;否则它们将永远循环。

EDIT

这是一个可以工作的boot应用程序......

@SpringBootApplication
public class So41914665Application {

    public static void main(String[] args) {
        SpringApplication.run(So41914665Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("foo", false, false, true);
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(queues = "foo")
    public void listen(Foo foo) {
        System.out.println(foo);
    }


    public static class Foo {

        public String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

我发送了这条消息

发布 json 消息

得到了如下结果:

2017-01-28 21:49:45.509  INFO 11453 --- [           main] com.example.So41914665Application        : Started So41914665Application in 4.404 seconds (JVM running for 5.298)
Foo [bar=baz]

引导程序将为您定义一个管理员和模板。


请到这个链接获取相关代码:https://github.com/stackspring/sample。这将有助于更好地确定问题所在。请pull该代码并尝试一下。 - user3444718
1
你需要在RabbitConfig类中添加@Configuration注解 - 否则boot无法看到转换器bean,因此也无法将其连接。顺便说一下,你不需要admin和template beans;boot的自动配置会为你添加它们。你也不需要一个ConnectionFactory;你可以将你的凭据放在application.yml(或.properties)中。请参阅boot参考文档关于RabbitMQ自动配置。当我添加了@Configuration时,你的示例对我有效。我还建议升级到1.4.4。 - Gary Russell
非常抱歉浪费您的时间 :(。我应该更加注意。下次发帖之前我会仔细检查两遍。是的,我不需要那些bean只是为了我做过的事情,但是我计划在这上面再尝试几个东西。 - user3444718

0
遇到了同样的问题,原来是 git stash/merge 弄乱了我的配置,我需要再次在我的主要文件中包含这个包:
@SpringBootApplication(scanBasePackages = {
        "com.example.amqp"  // <- git merge messed this up
})
public class TeamActivityApplication {

    public static void main(String[] args) {
        SpringApplication.run(TeamActivityApplication.class, args);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接