持续监听 AWS SQS 消息的模式

30

我有一个简单的类名叫做QueueService,其中包含一些方法,这些方法包装了AWS SQS SDK for Java中的方法。例如:

public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
        List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();

        ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
        for(Message message : messages) {
            Hashtable<String, String> resultItem = new Hashtable<String, String>();
            resultItem.put("MessageId", message.getMessageId());
            resultItem.put("ReceiptHandle", message.getReceiptHandle());
            resultItem.put("Body", message.getBody());
            resultList.add(resultItem);
        }
        return resultList;
    }

我有另一个名为App的类,它具有main并创建了QueueService的实例。

我正在寻找一种“模式”,使得App中的main可以监听队列中的新消息。现在我有一个while(true)循环,在其中调用receiveMessages方法:

while(true) {
            messages = queueService.receiveMessages(queueURL); 
            for(Hashtable<String, String> message: messages) {
                String receiptHandle = message.get("ReceiptHandle");
                String messageBody = message.get("MessageBody");
                System.out.println(messageBody);
                queueService.deleteMessage(queueURL, receiptHandle);
            }
        }

这是正确的方式吗?我应该在 SQS SDK 中使用异步消息接收方法吗?

6个回答

35
据我所知,Amazon SQS没有支持主动监听模型的方式,即Amazon SQS不会“推送”消息到您的监听器,也不会在有消息时调用您的消息监听器。因此,您总是需要轮询消息。有两种轮询机制可用于轮询-短轮询和长轮询。每种机制都有其优缺点,但通常情况下您会使用长轮询,尽管默认机制是短轮询。长轮询机制在网络流量方面肯定更有效率,更具成本效益(因为Amazon按请求次数收费),并且在您希望尽快处理消息时也是首选机制。关于长轮询和短轮询还有更多细节值得了解,这里很难简述所有内容,但如果您愿意,可以通过以下博客阅读更多详细信息。它还有一些代码示例,应该会有所帮助。

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

关于while(true)循环,我认为这取决于具体情况。如果你使用长轮询(Long Polling),并且可以将等待时间设置为最多20秒,那么当没有消息时,你就不需要频繁轮询SQS。如果有消息,你可以决定是经常轮询(以便尽快处理到达的消息),还是在时间间隔内始终处理它们(比如每n秒)。
另一个要注意的点是,你可以在单个receiveMessages请求中读取多达10条消息,因此这也会减少你对SQS的调用次数,从而降低成本。正如上面的博客详细解释的那样,你可以请求读取10条消息,但即使队列中有这么多消息,它也可能不会返回你10条。
总的来说,我认为你需要构建适当的钩子和异常处理程序,在运行时关闭轮询(如果你正在使用while(true)这种结构)。
另一个要考虑的方面是,你是否希望在主应用程序线程中轮询SQS,还是想要生成另一个线程。因此,另一种选择是在主线程中创建一个只有一个线程的ScheduledThreadPoolExecutor来定期轮询SQS(每隔几秒钟),这样你可能就不需要while(true)结构了。

4
这个说法在2017年是正确的,不过在2018年6月28日,从SQS触发Lambda函数变得可行:https://cabbagetech.blog/2018/08/09/configure-aws-sqs-messages-to-trigger-lambda-function/ - pzrq
@pzrq 值得注意的是,如果您正在使用 SQS FIFO 队列,则无法从 SQS 触发 Lambda。 - Adépòjù Olúwáségun
1
自2019年11月起,Lambda也支持从SQS FIFO触发,但它们不能保证仅传递一次。https://aws.amazon.com/blogs/compute/new-for-aws-lambda-sqs-fifo-as-an-event-source/ - César

2
您缺少以下几点:
  • 使用receiveMessages(ReceiveMessageRequest)并设置等待时间以启用长轮询。
  • 将您的AWS调用包装在try / catch块中。特别是要注意OverLimitException,如果您有太多未完成的消息,则可以从receiveMessages()抛出该异常。
  • while循环的整个主体包装在自己的try / catch块中,记录捕获的任何异常(不应该有 - 这里是为了确保您的应用程序不会因为AWS更改其API或您忽略处理预期异常而崩溃)。

有关长轮询和可能的异常的更多信息,请参见文档

至于使用异步客户端:您有什么特殊原因要使用它吗?如果没有,则不要使用:单个接收器线程更容易管理。


使用异步客户端的原因是因为将来在AWS Lambda中托管的应用程序(.jar)将使用SQS服务包装器来检查队列中是否有新消息需要处理。由于Lambda按事件触发或函数调用收费,我认为只有在有新消息时才执行监听器(使用sqs + jws)会更便宜。如果我在这一点上错了,请您纠正我一下。 - Francisco Hanna
异步客户端唯一的作用就是将每个非异步请求封装在一个“Callable”中,然后在内部线程池上运行它。我不确定“SQS服务包装器”是什么,但是SQS目前不是受支持的事件源。大多数人使用SNS,有些人使用Kinesis。 - kdgregory
使用“SQS服务包装器”一词,我指的是我创建的一个类,它封装了SQS SDK的某些方法。我已经找到了一种设置监听器的方法,当队列中有新消息时触发该监听器。我应该在文档中深入挖掘。 - Francisco Hanna

0

0

如果你想使用SQS来处理请求,然后再使用Lambda进行处理,可以按照link中给出的步骤进行操作,或者你也可以直接使用Lambda替代SQS,并对每个请求调用Lambda。


谢谢。我已经阅读了那篇文章,它使用了一个定时计划与Cloudwatch触发Lambda函数来轮询队列。我试图避免使用时间调度器。 - Francisco Hanna
2
今年,SQS可以触发Lambda函数。 - Chazt3n
1
@Chazt3n 如果你想添加一个答案,我可以接受它,因为SQS触发Lambda正是我目前所需要的 :) - Francisco Hanna

0

0
几年后了,但以防有人搜索这个主题,我发布我的解决方案。我不确定这是否是最好的方法,但对我们有效。请注意,这使用项目反应堆。还请注意,此解决方案用于不太时间关键的消息 - 几分钟的延迟对我们来说可以接受。
package myapp.amazonsqs;

import static org.apache.logging.log4j.LogManager.getLogger;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;

public class MyAmazonSqsMessagingGateway {

    private static final Logger LOGGER = getLogger(MyAmazonSqsMessagingGateway.class);
    private static final long POLLING_PERIOD_SECONDS = 30L;
    // max 20
    private static final int POLL_WAIT_TIME_SECONDS = 20;
    private static final long MINIMUM_RETRY_PERIOD_SECONDS = 30L;
    private final String amazonAwsRegion;
    private final String amazonAwsAccessKeyId;
    private final String amazonAwsAccessKeySecret;
    private final String queueName;
    private AmazonSQS amazonSqsClient;

    public MyAmazonSqsMessagingGateway(
            final String amazonAwsRegion,
            final String amazonAwsAccessKeyId,
            final String amazonAwsAccessKeySecret,
            final String queueName
    ) {
        this.amazonAwsRegion = amazonAwsRegion;
        this.amazonAwsAccessKeyId = amazonAwsAccessKeyId;
        this.amazonAwsAccessKeySecret = amazonAwsAccessKeySecret;
        this.queueName = queueName;
    }

    public void init() {
        this.amazonSqsClient = createClient();
        start();
    }

    private AmazonSQS createClient() {
        return AmazonSQSClientBuilder
                .standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        this.amazonAwsAccessKeyId,
                        this.amazonAwsAccessKeySecret
                )))
                .withRegion(Regions.fromName(this.amazonAwsRegion))
                .build();
    }

    private void start() {
        LOGGER.debug("Starting..");
        final String queueUrl = getAndCheckMessageQueueUrl();
        final Duration initialDelay = Duration.ofSeconds(1L);
        final Duration pollingPeriod = Duration.ofSeconds(POLLING_PERIOD_SECONDS);
        final Duration minimumRetryPeriod = Duration.ofSeconds(MINIMUM_RETRY_PERIOD_SECONDS);
        // retry indefinitely with backoff, until this application is stopped
        final long maxNumberOfRetryAttempts = Long.MAX_VALUE;
        Flux.interval(initialDelay, pollingPeriod)
                .map(ignoredParameter -> receiveMessages(this.amazonSqsClient, queueUrl))
                .retryWhen(Retry
                        .backoff(maxNumberOfRetryAttempts, minimumRetryPeriod)
                        .doBeforeRetry(retrySignal -> LOGGER.warn(
                                "Exception when receiving messages, retrying.. ",
                                retrySignal.failure()
                        ))
                        .doAfterRetry(retrySignal -> LOGGER.debug("Retry complete."))
                )
                .subscribe(
                        receiveMessageResult -> receiveMessageResult
                                .getMessages()
                                .forEach(this::handleMessage),
                        throwable -> LOGGER.error(
                                "Non-recoverable exception when receiving messages from Amazon SQS: ",
                                throwable
                        )
                );
        LOGGER.debug("Start completed.");
    }

    private ReceiveMessageResult receiveMessages(final AmazonSQS amazonSqsClient, final String queueUrl) {
        LOGGER.debug("Receiving messages...");
        return amazonSqsClient.receiveMessage(new ReceiveMessageRequest(
                queueUrl).withWaitTimeSeconds(POLL_WAIT_TIME_SECONDS)
                .withMaxNumberOfMessages(10));
    }

    private String getAndCheckMessageQueueUrl() {
        final String queueUrl = amazonSqsClient
                .getQueueUrl(this.queueName)
                .getQueueUrl();
        if (queueUrl == null) {
            throw new IllegalStateException("queueUrl is null, cannot run!");
        } else {
            LOGGER.info(() -> String.format("Listening in queue %s", queueUrl));
        }
        return queueUrl;
    }

    private void handleMessage(final Message message) {
        logMessage(message);
        // do something useful with the message here.
    }

    private static void logMessage(final Message message) {
        if (LOGGER.isDebugEnabled()) {
            final Map<String, String> attributes = message.getAttributes();
            final String attributesAsSingleString = attributes
                    .keySet()
                    .stream()
                    .map(key -> "Attribute " + key + " value = " + attributes.get(key))
                    .collect(Collectors.joining("\n"));
            LOGGER.debug("Message received! id = "
                    + message.getMessageId()
                    + "\nreceipt handle = "
                    + message.getReceiptHandle()
                    + "\n"
                    + attributesAsSingleString
                    + "body:\n"
                    + message.getBody());
        }
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接