如何在Amazon SQS中将死信队列中的消息自动移回原始队列?
这里还有一段脚本(用Typescript编写),可以将消息从一个AWS队列移动到另一个队列。也许对某些人很有用。
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageBatchCommand,
SendMessageBatchCommand,
} from '@aws-sdk/client-sqs'
const AWS_REGION = 'eu-west-1'
const AWS_ACCOUNT = '12345678901'
const DLQ = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/dead-letter-queue`
const QUEUE = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/queue`
const loadMessagesFromDLQ = async () => {
const client = new SQSClient({region: AWS_REGION})
const command = new ReceiveMessageCommand({
QueueUrl: DLQ,
MaxNumberOfMessages: 10,
VisibilityTimeout: 60,
})
const response = await client.send(command)
console.log('---------LOAD MESSAGES----------')
console.log(`Loaded: ${response.Messages?.length}`)
console.log(JSON.stringify(response, null, 4))
return response
}
const sendMessagesToQueue = async (entries: Array<{Id: string, MessageBody: string}>) => {
const client = new SQSClient({region: AWS_REGION})
const command = new SendMessageBatchCommand({
QueueUrl: QUEUE,
Entries: entries.map(entry => ({...entry, DelaySeconds: 10})),
// [
// {
// Id: '',
// MessageBody: '',
// DelaySeconds: 10
// }
// ]
})
const response = await client.send(command)
console.log('---------SEND MESSAGES----------')
console.log(`Send: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
console.log(JSON.stringify(response, null, 4))
}
const deleteMessagesFromQueue = async (entries: Array<{Id: string, ReceiptHandle: string}>) => {
const client = new SQSClient({region: AWS_REGION})
const command = new DeleteMessageBatchCommand({
QueueUrl: DLQ,
Entries: entries,
// [
// {
// "Id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
// "ReceiptHandle": "someReceiptHandle"
// }
// ]
})
const response = await client.send(command)
console.log('---------DELETE MESSAGES----------')
console.log(`Delete: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
console.log(JSON.stringify(response, null, 4))
}
const run = async () => {
const dlqMessageList = await loadMessagesFromDLQ()
if (!dlqMessageList || !dlqMessageList.Messages) {
console.log('There is no messages in DLQ')
return
}
const sendMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, MessageBody: msg.Body}))
const deleteMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle}))
await sendMessagesToQueue(sendMsgList)
await deleteMessagesFromQueue(deleteMsgList)
}
run()
附言:这个脚本还有改进的空间,但无论如何可能会有用。
这里有一个简单的Python脚本,您可以从cli中使用它来完成相同的操作,仅依赖于boto3
用法
python redrive_messages __from_queue_name__ __to_queue_name__
代码
import sys
import boto3
from src.utils.get_config.get_config import get_config
from src.utils.get_logger import get_logger
sqs = boto3.resource('sqs')
config = get_config()
log = get_logger()
def redrive_messages(from_queue_name:str, to_queue_name:str):
# initialize the queues
from_queue = sqs.get_queue_by_name(QueueName=from_queue_name)
to_queue = sqs.get_queue_by_name(QueueName=to_queue_name)
# begin querying for messages
should_check_for_more = True
messages_processed = []
while (should_check_for_more):
# grab the next message
messages = from_queue.receive_messages(MaxNumberOfMessages=1);
if (len(messages) == 0):
should_check_for_more = False;
break;
message = messages[0]
# requeue it
to_queue.send_message(MessageBody=message.body, DelaySeconds=0)
# let the queue know that the message was processed successfully
messages_processed.append(message)
message.delete()
print(f'requeued {len(messages_processed)} messages')
if __name__ == '__main__':
from_queue_name = sys.argv[1]
to_queue_name = sys.argv[2]
redrive_messages(from_queue_name, to_queue_name)
StartMessageMoveTaskCommand
命令将消息从DLQ重新发送到另一个队列。import { SQSClient, StartMessageMoveTaskCommand } from "@aws-sdk/client-sqs";
const client = new SQSClient(config);
const input = {
SourceArn: "ARN_OF_SOURCE_DLQ",
DestinationArn: "ARN_OF_DESTINATION",
};
const command = new StartMessageMoveTaskCommand(input);
const response = await client.send(command);