Amazon SQS和Alpine SQS Spring Boot中批量消费消息

4
我已经配置了SQS监听器以消耗消息列表,但我一次只收到一条消息,并且出现了错误:“无法将model.StudentData转换为java.util.ArrayList的实例”。
我的代码如下:
@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void receiveMessage(final StudentData studentData,
                               @Header("SenderId") final String senderId, final Acknowledgment acknowledgment) {

        // business logic
        acknowledgment.acknowledge();
    }


任何关于如何配置 SQS 监听器以消费多个消息的建议都会受到赞赏。
感谢您的帮助。
2个回答

3

上述问题的解决方案是:

final ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
        while (true) {
            final String queueUrl = amazonSqs.getQueueUrl("enter your queue name").getQueueUrl();
            final var receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
                    .withWaitTimeSeconds(20);

            List<Message> messages = amazonSqs.receiveMessage(receiveMessageRequest).getMessages();

            while (messages.size() > 0) {
                for (final Message queueMessage : messages) {
                    try {
                        String message = queueMessage.getBody();
                        amazonSqs.deleteMessage(new DeleteMessageRequest(queueUrl, queueMessage
                                .getReceiptHandle()));
                    } catch (Exception e) {
                        log.error("Received message with errors " + e);
                    }
                }
                messages = amazonSqs.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages();
            }
        }
    });
        executorService.shutdown();

2
SQS监听器注释提供了最简单的配置,它将逐个消费消息。这种限制直接来自Spring的QueueMessagingTemplate。
要消费批次,您可以直接使用AmazonSQS客户端。
    @Autowire AmazonSQSAsync amazonSqs;
    ...

    String queueUrl = amazonSqs.getQueueUrl("queueName").getQueueUrl();
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
    receiveMessageRequest.setQueueUrl(queueUrl);
    receiveMessageRequest.setWaitTimeSeconds(10); // Listener for messages in the next 10 seconds
    receiveMessageRequest.setMaxNumberOfMessages(1000); // If 10000 messages are read stop listening
    ReceiveMessageResult receiveMessageResult = amazonSqs.receiveMessage(receiveMessageRequest);
    receiveMessageResult.getMessages(); // batch of messages

setMaxNumberOfMessages 只允许设置1到10之间的值。因此,这段代码很可能无法按预期工作。还请查看 JavaDoc 类 ReceiveMessageRequest - Oliver Marienfeld

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接