在SQS队列中确认消息

6
我正在使用Java EE 7和Amazon SQS-JMS Java库与Amazon SQS。我的目标是在收到消息后,根据应用程序的业务逻辑确认(消耗)消息或将其重新发送到队列,并在3次失败重试后将其移动到DLQ。
我考虑使用JMS中的CLIENT_Acknowledge模式,并仅确认成功处理的消息,但这是他们官方文档中的说明:
“在此模式下,当确认消息时,所有在此消息之前接收到的消息都会被隐式确认。例如,如果接收了10条消息,并且仅确认第10条消息(按照接收消息的顺序),则前面的9条消息也会被确认。”
该示例似乎也证实了这一点:http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html#example-synchronous-receiver-client-acknowledge-mode
对我来说,这种行为有点奇怪,并且与我对客户端确认的期望相反。这里是否有比手动将消息发送到主SQS队列或DLQ更优雅的解决方案,具体取决于处理状态?

你为什么想要重新发送消息到队列中?一条消息应该被消费并删除,或者被保留不变。你希望通过“确认”一条消息来实现什么? - John Rotenstein
1
@JohnRotenstein,由于业务逻辑的原因,当我首先收到事件B,然后是A时,我想将B返回到队列中,在处理完事件A后再进行处理。此外,我希望在重试3次失败后将消息移动到DLQ,因此将消息返回到队列是有意义的。 - Bojan Trajkovski
1
SQS可以提供自己的DLQ逻辑。此外,如果您使用SQS FIFO队列,它可以保证顺序。鉴于这些内置功能,我建议您不要重新处理消息--只需将其取出,处理,然后删除即可。 - John Rotenstein
2个回答

4
您可以使用:

UNORDERED_ACKNOWLEDGE

SQSSession.UNORDERED_ACKNOWLEDGE

它来自于 'com.amazon.sqs.javamessaging;',根据文档中的说明,它是Client_Acknowledge的一个变体,仅确认调用它的消息。
 /**
 * Non standard acknowledge mode. This is a variation of CLIENT_ACKNOWLEDGE
 * where Clients need to remember to call acknowledge on message. Difference
 * is that calling acknowledge on a message only acknowledge the message
 * being called.
 */

依赖示例:

"com.amazonaws:amazon-sqs-java-messaging-lib:1.0.3"

2
非常感谢,一个更好的名称会节省很多时间。 - lkamal

3
为处理这种情况,您可以使用您创建的DLQ的RedrivePolicy属性。此情况的解决方案如下:
  • 创建2个sqs队列 my_qmy_q_dl(后者用于DLQ)
  • 使用RedrivePolicy将DLQ my_q_dl 设置为my_q的DLQ。
  • 在此处,应注意指定deadLetterTargetArnmaxReceiveCount。这个maxReceiveCount是您想要在将消息发送到DLQ之前处理任何消息而不进行确认的次数。如果你设置了maxReceiveCount=3,则该消息将留在my_q中直到消费者第三次拉取时没有ack。 有两种情况:
    • 正常情况:收到ack后,消息将被删除。
    • 如果在第三次pull之前没有ack(消息删除),则该消息将从my_q中删除并推送到my_q_dl本身。

*RedrivePolicy* - 包括源队列的死信队列功能的参数的字符串。

deadLetterTargetArn - 在超过maxReceiveCount的值后,Amazon SQS将消息移动到的死信队列的Amazon资源名称(ARN)。

maxReceiveCount - 消息被传递到源队列的次数,然后被移动到死信队列。

注意:FIFO队列的死信队列也必须是FIFO队列。同样,标准队列的死信队列也必须是标准队列。


1
那么当RedrivePolicy有多个属性时,如何将其格式化为字符串?作为JSON字符串。 SetQueueAttributesRequest请求= new SetQueueAttributesRequest() .withQueueUrl(src_queue_url) .addAttributesEntry("RedrivePolicy", "{"maxReceiveCount":"5", "deadLetterTargetArn":""
  • dl_queue_arn + ""}");
- Wpigott

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接