Spring Cloud SQS - 轮询间隔

13

使用Spring Cloud监听AWS SQS队列的方法如下:

@SqsListener(value = "${queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) {
    // code
}

Spring配置:

<aws-messaging:annotation-driven-queue-listener
    max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
    amazon-sqs="awsSqsClient" />

AwsSqsClient:

@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
}

这个很好用。

如上面的代码所示,将10个线程配置为处理SQS客户端中的这些消息。这也很好用,任何时候最多处理10条消息。

问题是,我找不到控制轮询间隔的方法。默认情况下,Spring在所有线程都空闲时才进行一次轮询。

例如:

  1. 约有3条消息被传递到队列
  2. Spring轮询队列并获取3条消息
  3. 正在处理3条消息,每条消息大约需要20分钟

与此同时,约有25条消息传递到队列中。直到早期传递的3条消息全部完成后,Spring才不会轮询队列。如上面的例子所示,即使有7个线程空闲,Spring也要等待20分钟才开始轮询!

您有什么想法可以控制此轮询?即:如果有任何线程空闲,则应该开始轮询,并且不应等待所有线程都变为空闲状态。


你能够使用 @SqsListener 吗?你是从源代码构建的吗? - sag
我也面临着控制轮询间隔的相同情况。你找到了任何解决方案吗? - jenitshah
2
我们找不到任何选项,退出了Spring并开始直接使用AWS SDK进行队列轮询。在使用AWS SDK进行轮询时,根据可用线程数限制消息数量。 - yottabrain
1个回答

3

您的监听器可以将消息加载到Spring应用程序中,并将它们与AcknowledgementVisibility对象一起提交给另一个线程池(如果您想同时控制这两个对象)。

一旦消息被提交到此线程池,您的监听器可以加载更多的数据。您可以通过调整线程池设置来控制并发性。

您的监听器方法签名将类似于以下内容:

@SqsListener(value = "${queueName}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
                   @Headers Map<String, Object> headers,
                   Acknowledgment acknowledgment,
                   Visibility visibility) throws Exception {
...... Send pojo to worker thread and return

一个工作线程会确认处理成功。
acknowledgment.acknowledge().get();

确保您的消息可见性设置为大于您的最长处理时间的值(使用一些超时来限制执行时间)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接