将SqsListener配置为长轮询

5

我正在使用Spring Boot实现SQS订阅,经过在互联网上的一些研究后,我找到了项目spring-cloud

使用注释@SqsListener很容易从主题接收消息,但我想将其实现为长轮询而不是短轮询,以便只有在有新消息时才会接收到消息。

    @SqsListener(
        value = ["queue"],
        deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
    )
    fun subscribeToSSmsg: String) {
    ....
    }

目前代码运行顺畅,但我想使用长轮询来接收消息。在spring-cloud中有没有实现它的方法?

1个回答

7

使用AWS Java SDK 1.X。

您需要定义一个自定义 org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory bean,它需要一个 com.amazonaws.services.sqs.AmazonSQSAsync bean。

将以下bean添加到您的配置中。

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setWaitTimeOut(20); //Long polling, the maximum value of 20 seconds
    factory.setAmazonSqs(amazonSqs);
    factory.setMaxNumberOfMessages(10); //The maximum number of messages you want to read in a single poll
    return factory;
}

@Bean
@Primary
AmazonSQSAsync amazonSQSAsync(AWSCredentialsProvider credentialsProvider) {
    return new AmazonSQSBufferedAsyncClient(
        AmazonSQSAsyncClientBuilder
            .standard()
            .withRegion("region") //Set an appropriate region
            .withCredentials(credentialsProvider)
            .build());
}

同时确保您使用的HTTP客户端的超时时间比您设置的长轮询超时时间更长。


它是如何工作的?我的方法会接收一个消息列表吗? - placplacboom
有一个单一的轮询线程和多个工作线程(默认每个@SqsListener注解2个,您可以提供此数量),用于处理消息。每个工作线程一次只会接收1条消息。如果您想深入了解,请查看SimpleMessageListenerContainer的源代码。在添加SqsListener注释和编写处理单个消息的函数时,没有任何更改。所有这些协调轮询并将它们交给工作人员最终在成功时删除您的消息都为您处理。 - GSSwain
谢谢您的解释,但我还是有些不明白。我正在使用Localstack进行本地测试,当我在队列上发布单个消息时,我的函数subscribeToSSmsg()会自动调用 - 它会被每个到达的消息调用。我原以为我会在20秒内收到10条消息 - 每20秒,我可以收到10条消息。我错过了什么? - placplacboom
不是这样的,我想知道是因为我在文档中看到 AmazonSQSBufferedAsyncClient 不支持 FIFO 队列,所以我在想是否可以在 FIFO 上进行长轮询。 - placplacboom
你是正确的,如果你有一个FIFO队列,请不要使用AmazonSQSBufferedAsyncClient。 - GSSwain
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接