SQS FIFO 使用 MessageGroupId 接收消息

4

如何使用messagegroupid参数仅接收标记有所需ID的队列消息?

我一直在尝试使用下面的代码行进行检索,但它总是会接收来自其他组ID的所有队列消息。

List<Message> messages = sqs.receiveMessage(receiveMessageRequest.withAttributeNames("MessageGroupId")).getMessages();

应该如何正确操作?

2个回答

5

ReceiveMessageRequest 不用于基于消息属性的过滤。如果查看 ReceiveMessageRequest.html.withAttributeNames() 的文档,它会说:

需要与每个消息一起返回的属性列表。

总体来说,您无法过滤从 SQS 返回的消息。您可以限制数量,但不能说:“给我所有符合此模式的消息。”


谢谢回复!这是否意味着我应该有两个单独的队列来处理具有不同消息组ID的消息?这样我就不会收到其他组ID的消息了。 - JustStarted
@JustStarted - 那将是解决问题的非常简单的方法。通常很容易创建队列,而能够分区数据可以简化设计。 - stdunbar

0
我的解决方案是利用 ChangeMessageVisibilityBatchRequest(请参考文档),基本上是将消息发送回队列以进行重新处理。
我的 Lambda 具有基于时间的触发器。每次启动时,我会收到消息,直到没有更多消息为止。对于每个批次,我按 MessageGroupId 对消息进行分组,处理并删除第一组,并将其余组的消息发送回队列,以便在下一次迭代期间接收。
以下是我的代码要点:
(请注意,_awsSqsService.SendBackToQueueAsync(groupMessages) 方法最终通过 AWS SQS ChangeMessageVisibilityBatchRequest 将消息发送回队列。)
public async Task Run()
{
    var batchContainsMessages = true;
    while (batchContainsMessages)
    {
        var messageBatch = await _awsSqsService.GetMessageBatchAsync();
        if(messageBatch.Messages.Count > 0 && messageBatch.HttpStatusCode == HttpStatusCode.OK)
        {
            await ProcessMessageBatchAsync(messageBatch.Messages);
        }
        else
        {
            batchContainsMessages = false;
        }
    }
}

private async Task ProcessMessageBatchAsync(List<Message> messages)
{
    // SQS fifo queues will often return a batch of messages with different MessageGroupIds.
    // Because of this, we need to group them ourselves, process one group (the first group), 
    // and send the rest back to the queue to be processed in the next iteration. 
    // This ensures that we process as many messages as possible per group in a single batch (max is 10)
    var messageGroups = GetMessageGroups(messages);

    var isFirstGroup = true;
    foreach (var group in messageGroups)
    {
        var groupId = Int32.Parse(group.Key);
        var groupMessages = group.Value;
        if (isFirstGroup)
        {
            isFirstGroup = false;
            await ProcessMessagesAsync(groupId, groupMessages);
            await _awsSqsService.DeleteMessagesAsync(groupMessages);
        }
        else
        {
            await _awsSqsService.SendBackToQueueAsync(groupMessages);
        }
    }
}

1
我有这样一种情况,即我只想在每批处理组中的最新消息。可能会尝试类似这样的方法,谢谢!您遇到了什么问题吗? - Tristan Perotti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接