如何在Amazon SQS中自动将消息从DLQ移出?

141
如何在Amazon SQS中将死信队列中的消息自动移回原始队列?

https://github.com/garryyao/replay-aws-dlq 运行得非常好。 - Ulad Kasach
1
另外一个选择是 https://github.com/mercury2269/sqsmover。 - Sergey
这个有什么更新吗?经过相当长的时间后,你对最佳方法有了新的结论吗? - Judy007
2
从2021年12月1日开始,可以使用AWS控制台将消息从DLQ中移出。请参阅此博客文章:https://aws.amazon.com/blogs/aws/enhanced-dlq-management-sqs/ - bszwej
2
这仅适用于标准队列,不适用于FIFO队列。 - ash123
17个回答

1

这里还有一段脚本(用Typescript编写),可以将消息从一个AWS队列移动到另一个队列。也许对某些人很有用。


import {
    SQSClient,
    ReceiveMessageCommand,
    DeleteMessageBatchCommand,
    SendMessageBatchCommand,
} from '@aws-sdk/client-sqs'

const AWS_REGION = 'eu-west-1'
const AWS_ACCOUNT = '12345678901'

const DLQ = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/dead-letter-queue`
const QUEUE = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/queue`

const loadMessagesFromDLQ = async () => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new ReceiveMessageCommand({
        QueueUrl: DLQ,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 60,
    })
    const response = await client.send(command)

    console.log('---------LOAD MESSAGES----------')
    console.log(`Loaded: ${response.Messages?.length}`)
    console.log(JSON.stringify(response, null, 4))
    return response
}

const sendMessagesToQueue = async (entries: Array<{Id: string, MessageBody: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new SendMessageBatchCommand({
        QueueUrl: QUEUE,
        Entries: entries.map(entry => ({...entry, DelaySeconds: 10})),
        // [
        // {
        //     Id: '',
        //     MessageBody: '',
        //     DelaySeconds: 10
        // }
        // ]
    })
    const response = await client.send(command)
    console.log('---------SEND MESSAGES----------')
    console.log(`Send: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const deleteMessagesFromQueue = async (entries: Array<{Id: string, ReceiptHandle: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new DeleteMessageBatchCommand({
        QueueUrl: DLQ,
        Entries: entries,
        // [
        //     {
        //         "Id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        //         "ReceiptHandle": "someReceiptHandle"
        //     }
        // ]
    })
    const response = await client.send(command)
    console.log('---------DELETE MESSAGES----------')
    console.log(`Delete: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const run = async () => {
    const dlqMessageList = await loadMessagesFromDLQ()

    if (!dlqMessageList || !dlqMessageList.Messages) {
        console.log('There is no messages in DLQ')
        return
    }

    const sendMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, MessageBody: msg.Body}))
    const deleteMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle}))

    await sendMessagesToQueue(sendMsgList)
    await deleteMessagesFromQueue(deleteMsgList)
}

run()


附言:这个脚本还有改进的空间,但无论如何可能会有用。


1
SQS DLQ重试官方SDK/CLI支持已经发布(请参见这里)。

0
自2021年12月起,通过AWS控制台进行DLQ重试已经可用,但在2023年6月,AWS宣布通过AWS SDK或CLI支持编程式的DLQ重试。
引用上面的文章
为了以编程方式自动化死信队列消息转发工作流程,客户现在可以使用以下操作:
1. StartMessageMoveTask,从死信队列开始一个新的消息转发任务; 2. CancelMessageMoveTask,取消消息转发任务; 3. ListMessageMoveTasks,获取指定源队列的最近10个消息转发任务。

0

虽然这个链接可能回答了问题,但最好在这里包含答案的关键部分,并提供链接作为参考。仅有链接的答案如果链接页面发生变化,可能会变得无效。- 来自评论 - undefined

0

0

这里有一个简单的Python脚本,您可以从cli中使用它来完成相同的操作,仅依赖于boto3

用法

python redrive_messages __from_queue_name__ __to_queue_name__

代码

import sys
import boto3

from src.utils.get_config.get_config import get_config
from src.utils.get_logger import get_logger

sqs = boto3.resource('sqs')

config = get_config()
log = get_logger()

def redrive_messages(from_queue_name:str, to_queue_name:str):
  # initialize the queues
  from_queue = sqs.get_queue_by_name(QueueName=from_queue_name)
  to_queue = sqs.get_queue_by_name(QueueName=to_queue_name)

  # begin querying for messages
  should_check_for_more = True
  messages_processed = []
  while (should_check_for_more):
    # grab the next message
    messages = from_queue.receive_messages(MaxNumberOfMessages=1);
    if (len(messages) == 0):
      should_check_for_more = False;
      break;
    message = messages[0]

    # requeue it
    to_queue.send_message(MessageBody=message.body, DelaySeconds=0)

    # let the queue know that the message was processed successfully
    messages_processed.append(message)
    message.delete()
  print(f'requeued {len(messages_processed)} messages')

if __name__ == '__main__':
  from_queue_name = sys.argv[1]
  to_queue_name = sys.argv[2]
  redrive_messages(from_queue_name, to_queue_name)

0
大多数答案已经过时。现在,您可以使用StartMessageMoveTaskCommand命令将消息从DLQ重新发送到另一个队列。
import { SQSClient, StartMessageMoveTaskCommand } from "@aws-sdk/client-sqs"; 

const client = new SQSClient(config);

const input = {
  SourceArn: "ARN_OF_SOURCE_DLQ",
  DestinationArn: "ARN_OF_DESTINATION",
};

const command = new StartMessageMoveTaskCommand(input);
const response = await client.send(command);

你可以在这里查看文档:https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_StartMessageMoveTask.html

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接