SQS消息始终处于运行状态

6
我有以下代码从SQS队列中检索消息。 我使用AmazonSQSBufferedAsyncClient从队列中检索消息。 固定延迟的SingleThreadedExecutor每5分钟唤醒一次,调用receiveMessage。 队列启用了长轮询。
@Service
public class AmazonQueueService
    implements QueueService<String> {

    @Autowired
    private AmazonSQSBufferedAsyncClient sqsAsyncClient;

    @Value("${aws.sqs.queueUrl}")
    private String queueUrl;

    @Override
    public List<Message<String>> receiveMessage() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
        ReceiveMessageResult result = sqsAsyncClient.receiveMessage(receiveMessageRequest);

        LOG.debug("Size=" + result.getMessages().size());

        return Lists.transform(result.getMessages(), ......);
    }

    .....
}

问题在于当我检查AWS控制台时,消息始终处于in-flight状态,但应用程序永远不会接收到该消息(大小始终为0)。看起来AmazonSQSBufferedAsyncClient正在从队列中读取消息,但在receiveMessage调用中没有返回。
有什么想法吗?

AmazonSQSBufferedAsyncClient 如何配置(超时、线程等)?队列如何配置,关于消息的可见性超时?或者在发送消息时是否指定了可见性超时? - Bruno Reis
刚刚发生了一件事:你似乎在使用某种依赖注入(@Autowired)。你有可能会创建多个 AmazonSQSBufferedAsyncClient 的实例,而不是一个单例实例,它被所有 AmazonQueueService 类的实例共享吗?如果是这种情况,它们将会竞争消息。你可以尝试打印 AmazonSQSBufferedAsyncClient.hashCode() 来确保你只有一个实例... - Bruno Reis
我非常确定只有一个 `AmazonSQSBufferedAsyncClient' 的实例。队列属性如下: - vijay
我非常确定只有一个`AmazonSQSBufferedAsyncClient'的实例。队列属性如下:可见超时时间:2分钟;保留期:4天;接收消息等待时间:20秒。在发出请求时没有设置任何属性,因此应默认使用队列属性。 - vijay
4个回答

7

终于搞清楚了。这个问题是由队列可见性超时(2分钟)和scheduledExecutor延迟(5分钟)的组合引起的。

将可见性超时增加到15分钟解决了问题。

我的理论是 -> AmazonSQSBufferedAsyncClient检索消息并将其保留在缓冲区中等待receiveMessage调用。由于执行器延迟为5分钟,因此消息的可见性在调用receiveMessage之前就会超时,并将消息返回到队列中。它看起来也几乎立即从队列中选择了该消息。现在由于某种原因,receiveMessage调用不接收消息。增加超时时间给了receiveMessage调用在超时事件之前发生的机会,解决了问题,我想。

还有其他可能的解释吗?


3

对我来说,问题在于我的消息被Lambda读取,甚至在我在控制台上看到它们之前。结果是我无法从SQS控制台轮询消息。


1
天啊,非常感谢!我一直在本地工作,没意识到我已经进入自动驾驶模式,并创建了一个Lambda实际上正在消费SQS消息。我的妈呀,当时我真的很困惑。 - rmolinamir

2
当你完成消息处理后,必须从队列中删除该消息。如果不这样做,它将保持在传输状态,直到超时然后回到队列。这是为了确保您永远不会丢失消息而设计的。如果您的程序在处理和删除消息之前崩溃,该消息将立即返回队列。
从基本的Java示例SampleDriver.java)中:
QMessage message = messages.get(0);

System.out.println("\nMessage received");
System.out.println("  message id:     " + message.getId());
System.out.println("  receipt handle: " + message.getReceiptHandle());
System.out.println(" message content: " + message.getContent());

testQueue.deleteMessage(message.getReceiptHandle()); // <===== here

是的。但删除操作至少需要接收和处理一次消息后才能执行,我没有从sqsAsyncClient.receiveMessage调用中接收到消息。 - vijay
1
啊。如果它是在运行中的话,那么问题可能出现在客户端。有可能是因为您有另一个客户端收到了消息但未将其删除,或者 Java 调用存在问题。您是否尝试过像 Python boto 这样的其他库? - kichik

0
在我的情况下,我使用的是短轮询,有时会接收到多条消息,但我只处理了一条。因此丢失了部分消息的踪迹。由于消息从未被处理,因此它的可见性超时时间从未到达,消息就会出现在“正在处理的消息”中。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接