如何在Amazon SQS中自动将消息从DLQ移出?

141
如何在Amazon SQS中将死信队列中的消息自动移回原始队列?

https://github.com/garryyao/replay-aws-dlq 运行得非常好。 - Ulad Kasach
1
另外一个选择是 https://github.com/mercury2269/sqsmover。 - Sergey
这个有什么更新吗?经过相当长的时间后,你对最佳方法有了新的结论吗? - Judy007
2
从2021年12月1日开始,可以使用AWS控制台将消息从DLQ中移出。请参阅此博客文章:https://aws.amazon.com/blogs/aws/enhanced-dlq-management-sqs/ - bszwej
2
这仅适用于标准队列,不适用于FIFO队列。 - ash123
17个回答

165

这是一种快速的 hack。但这绝不是最佳或推荐的选项。

  1. 将主 SQS 队列设置为实际 DLQ 的 DLQ,最大接收次数为 1。
  2. 查看 DLQ 中的内容(这将使消息移动到主队列中,因为这是实际 DLQ 的 DLQ)
  3. 取消设置,使主队列不再是实际 DLQ 的 DLQ

19
是的,这很像一个黑客方式 - 但如果你知道自己在做什么并且没时间用正规方法解决,这是一个不错的快速修复选择 #yolo - Thomas Watson
15
当你这样做时,接收计数并不会被重置为0。请注意。 - Rajdeep Siddhapura
2
正确的方法是在SQS中配置Redrive策略,设置最大接收计数,当消息超过设置的接收计数时,它将自动将消息移动到DLQ,然后编写一个读取器线程从DLQ中读取消息。 - Ash
5
你是个天才。 - JefClaes
3
几个月前,我创建了一个 CLI 工具来解决这个问题:https://github.com/renanvieira/phoenix-letter - MaltMaster
显示剩余6条评论

58

9
注意:目前控制台仅支持标准队列的DLQ重发功能。 - Ahmad Nabil

44

有一些可用的脚本可以帮助您完成此操作:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 

3
这是最简单的方法,不同于被接受的答案。只需从已设置AWS环境变量属性的终端运行此命令:npx replay-aws-dlq DL_URI MAIN_URI - Vasyl Boroviak

安装

npm install replay-aws-dlq;
- Lee Oades
这对我来说完美无缺(注意,我只尝试了基于Go的那个)。它似乎分阶段移动消息而不是一次性全部移动(这是一件好事),甚至还有进度条。在我看来比被接受的答案更好。 - Eugene Ananin
有一篇最近的AWS博客文章使用Lambda来完成给定的任务。它也发布在AWS无服务器应用程序存储库中:https://aws.amazon.com/blogs/compute/using-amazon-sqs-dead-letter-queues-to-replay-messages/(我还没有尝试过,因为我会选择上面的快速hack,但这似乎是正确的方法) - t-h-
很好地运行了,在AWS CloudShell中执行了NPM命令。 - Milan Gatyás
该项目在Github上的活动很少,而且似乎无法正确地回放FIFO(它修改了重复ID和组ID):( - Stephane

17

不需要移动消息,因为这样会带来很多其他挑战,比如重复的消息、恢复场景、丢失的消息、去重检查等。

我们实施的解决方案如下:

通常,我们将DLQ用于瞬态错误,而不是永久性错误。所以采用以下方法:

  1. 像普通队列一样从DLQ读取消息

    优点
    • 避免重复的消息处理
    • 更好地控制DLQ-比如我设置了一个检查,只在常规队列完全处理时才进行处理。
    • 根据DLQ上的消息扩展进程
  2. 然后按照常规队列的方式执行相同的代码。

  3. 在处理过程中,在终止作业或进行处理的过程被终止的情况下更可靠(例如,实例被杀死或进程终止)

    优点
    • 代码可重用性
    • 错误处理
    • 恢复和消息重播
  4. 扩展消息的可见性,使其他线程无法处理它们。

    优点
    • 避免多个线程处理相同的记录。
  5. 仅当存在永久性错误或成功时才删除消息。

    优点
    • 保持处理,直到我们遇到瞬态错误。

我非常喜欢你的方法!在这种情况下,你如何定义“永久错误”? - DMac the Destroyer
任何大于HTTP状态码>200 <500的内容都是永久性错误。 - Ash
2
这确实是生产中不错的方法。然而我认为此帖子只是在询问如何将DLQ中的消息重新发布到正常队列中,如果您知道该怎么做,有时会很方便。 - linehrr
这就是我所说的,你不应该这样做。因为如果你这样做,那么会产生更多的问题。我们可以像推送任何其他消息一样移动消息,但将失去DLQ功能,如接收计数、可见性等。它将被视为新消息。 - Ash

9
我编写了一个小型的Python脚本,使用boto3库实现此功能:
conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

你可以在这个链接中获取此脚本。
该脚本基本上可以在任意队列之间移动消息,并且它支持先进先出(FIFO)队列,同时你也可以提供message_group_id字段。

1
对于先进先出队列(FIFO队列),在发送消息之前,您需要提供MessageDeduplicationId,或者使队列启用ContentBasedDeduplication。您可以重新使用DLQ中的消息中的MessageDeduplicationId - Sapience

8

看起来这是您最好的选择。如果在第二步之后您的进程失败,那么您将复制两次该消息,但是您的应用程序应该处理消息的重新传递(或者不关心)。


6

在这里:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()

这是Python吗? - carlin.scott
Python2实际上 - Kristof Jozsa

4
DLQ只有在经过多次尝试后原始消费者无法成功消费消息时才会起作用。 我们不想删除该消息,因为我们相信我们仍然可以对其进行一些处理(例如再次尝试处理、记录或收集某些统计信息),而且我们也不想一遍又一遍地遇到这条消息并停止处理其他在其后面的消息的能力。
DLQ本质上只是另一个队列。这意味着我们需要为DLQ编写一个消费者,该消费者理想情况下应该运行频率比原始队列低,它将从DLQ中消费并将消息重新发送到原始队列,然后从DLQ中删除 - 如果这是预期行为,并且我们认为原始消费者现在已准备好再次处理它。如果这个循环继续进行一段时间,那应该没问题,因为现在我们还有机会手动检查并进行必要的更改,并部署另一个版本的原始消费者,而不会丢失消息(当然,在消息保留期限内 - 默认为4天)。
如果AWS能够提供这种功能,那就太好了,但我还没有看到它 - 他们将这个功能留给最终用户以使用他们认为合适的方式。

3
我们使用以下脚本将消息从源队列重新发送到目标队列:
文件名:redrive.py
用法:python redrive.py -s {源队列名称} -t {目标队列名称}
'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()

3

还有另一种方法可以在不写任何代码的情况下实现此目标。假设您实际的队列名称是SQS_Queue,与之配对的DLQ为SQS_DLQ。现在请按照以下步骤进行操作:

  1. 将SQS_Queue设置为SQS_DLQ的dlq。由于SQS_DLQ已经是SQS_Queue的dlq,现在两者都充当了彼此的dlq。
  2. 将SQS_DLQ的最大接收计数设置为1。
  3. 现在从SQS_DLQ控制台读取消息。由于消息接收计数为1,它将把所有消息发送到其自己的dlq,即您实际的SQS_Queue队列。

这将违背维护DLQ的目的。DLQ旨在在观察到故障时不过载您的系统,以便稍后处理。 - Buddha
1
它肯定会打败目的,你将无法实现其他好处,如扩展、限流和接收计数。此外,您应该使用常规队列作为处理队列,如果消息接收计数达到“N”,则应将其发送到DLQ。这是理想情况下应该配置的方式。 - Ash
3
作为重新推送大量消息的一次性解决方案,这个方法非常实用。然而,不是一个好的长期解决方案。 - nmio
1
是的,这对于重新传递消息(在主队列中修复问题后)作为一次性解决方案非常有价值。在 AWS CLI 中,我使用的命令是:aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10。由于您可以读取的最大消息数限制为 10 条,因此建议像这样循环运行命令:for i in {1..1000}; do <CMD>; done - Patrick Finnigan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接