使用SqsListener与SNS和SQS

14
我将使用spring-cloud-awsSqsListener来接收AWS Simple Queue Service (SQS)中以JSON格式从AWS的SNS HTTP通知发送的消息。

以下是监听器的代码:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final MyObject obj) throws Exception {
// ...
}

上述链接的文档仅涉及将普通序列化对象发送到队列并读取,我认为接收SNS消息应该可以直接使用。但实际上我遇到了转换错误:

10:45:51.480 [simpleMessageListenerContainer-2] ERROR o.s.c.a.m.l.SimpleMessageListenerContainer - 处理消息时出现异常。 org.springframework.messaging.MessagingException: 调用处理程序方法时发生异常;嵌套异常是 org.springframework.messaging.converter.MessageConversionException: 找不到转换器以转换为类com.myproject.model.MyObject, 消息=GenericMessage

我还尝试创建一个外壳对象,看起来与上面链接的预期SNS Json格式相同,但我仍然遇到了相同的异常。唯一有效的类型是签名中的字符串。难道SNS不应该自动转换吗?

2个回答

9

是的,它应该可以。实际上也确实如此。

为了在反序列化时调用正确的HandlerMethodArgumentResolver(在本例中为NotificationMessageArgumentResolver),从而调用正确的转换器NotificationRequestConverter,您只需要在方法签名中添加注释org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage即可。例如:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final @NotificationMessage MyObject obj) throws Exception {
// ...
}

这样可以将SNS的Message部分提取出来并转换为MyObject

你如何能够从SQS接收对象作为字符串?难道SQS只支持字符串作为消息吗? - teuber789
@jtcotton63 是的,负载以 String 形式传递 - 一个 JSON 对象 - @NotificationMessage 调用一个转换器将负载转换为所需类的对象。 - msp
4
补充此答案,对于我而言,仅仅添加@NotificationMessage注释并不能解决问题,因为我正在使用自己的ArgumentResolver覆盖QueueMessageHandlerFactory。这也会是很多人面临的情况,因为这是必要的以便自定义Jackson映射器。在这种情况下,必须将解析器更改为:从 factory.setArgumentResolvers(List.of(new PayloadArgumentResolver(jacksonMessageConverter))); 改为 factory.setArgumentResolvers(List.of(new NotificationMessageArgumentResolver(jacksonMessageConverter))); - Yonatan Wilkof

4
这也可以在没有@NotificationMessage的情况下工作。这样你就不需要发送“Type”和“Message”部分,这是必需的才能使用此注释。
首先创建一个具有所需属性的类。
public class SqsMessage {

   private String myTask;

   public SqsMessage() {
   }

   public SqsMessage(@JsonProperty("MyTask") String myTask ) {
       this.myTask = myTask ;
   }

   //Getter + Setter 
}

接下来设置监听器

@SqsListener(value = {"MyQueue"}, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(SqsMessage payload, @Headers Map<String, Object> header) {
   logger.info("Got message with task: " + payload.getTask() 
    + " with custom attribute " + header.get("CustomAttribute").toString());
}

现在您可以发送类似以下的JSON数据:
{"MyTask":"My task"}

POJO(Plain Old Java Object)中的构造函数中的@JsonProperty("MyTask")注释可以是可选的,这取决于您使用的Spring版本以及您的属性是否与Json字符串中的名称相同。例如,如果您的属性称为task,而您的Json字符串为{"task":"My task"},则这不是必需的。


但问题是关于接收SNS消息,其中TypeMessage格式的一部分。 - msp
虽然时间有些久远,但据我所记, Message 被映射到 POJO 上,而 Type 信息则是头部的一部分。我的回答与被接受的答案完全相同,但我没有使用 @NotificationMessage 注释,这在我的情况下会导致其他解析问题。JSON 只是额外加分。 - Chris

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接