如何在Amazon SQS中将死信队列中的消息自动移回原始队列?
这是一种快速的 hack。但这绝不是最佳或推荐的选项。
有一些可用的脚本可以帮助您完成此操作:
# install
npm install replay-aws-dlq;
# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source
# use
sqsmover -s [source_queue_url] -d [dest_queue_url]
npx replay-aws-dlq DL_URI MAIN_URI
。 - Vasyl Boroviak不需要移动消息,因为这样会带来很多其他挑战,比如重复的消息、恢复场景、丢失的消息、去重检查等。
我们实施的解决方案如下:
通常,我们将DLQ用于瞬态错误,而不是永久性错误。所以采用以下方法:
像普通队列一样从DLQ读取消息
优点然后按照常规队列的方式执行相同的代码。
在处理过程中,在终止作业或进行处理的过程被终止的情况下更可靠(例如,实例被杀死或进程终止)
优点扩展消息的可见性,使其他线程无法处理它们。
优点仅当存在永久性错误或成功时才删除消息。
优点conf = {
"sqs-access-key": "",
"sqs-secret-key": "",
"reader-sqs-queue": "",
"writer-sqs-queue": "",
"message-group-id": ""
}
import boto3
client = boto3.client(
'sqs',
aws_access_key_id = conf.get('sqs-access-key'),
aws_secret_access_key = conf.get('sqs-secret-key')
)
while True:
messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)
if 'Messages' in messages:
for m in messages['Messages']:
print(m['Body'])
ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
print(ret)
client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
else:
print('Queue is currently empty or messages are invisible')
break
message_group_id
字段。MessageDeduplicationId
,或者使队列启用ContentBasedDeduplication
。您可以重新使用DLQ中的消息中的MessageDeduplicationId
。 - Sapience看起来这是您最好的选择。如果在第二步之后您的进程失败,那么您将复制两次该消息,但是您的应用程序应该处理消息的重新传递(或者不关心)。
在这里:
import boto3
import sys
import Queue
import threading
work_queue = Queue.Queue()
sqs = boto3.resource('sqs')
from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)
def process_queue():
while True:
messages = work_queue.get()
bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})
to_q.send_messages(Entries=bodies)
for message in messages:
print("Coppied " + str(message.body))
message.delete()
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
'''
This script is used to redrive message in (src) queue to (tgt) queue
The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1.
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.
Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--src', required=True,
help='Name of source SQS')
parser.add_argument('-t', '--tgt', required=True,
help='Name of targeted SQS')
args = parser.parse_args()
return args
def verify_queue(queue_name):
queue_url = sqs.get_queue_url(QueueName=queue_name)
return True if queue_url.get('QueueUrl') else False
def get_queue_attribute(queue_url):
queue_attributes = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['All'])['Attributes']
print(queue_attributes)
return queue_attributes
def main():
args = parse_args()
for q in [args.src, args.tgt]:
if not verify_queue(q):
print(f"Cannot find {q} in AWS SQS")
src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']
target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
target_queue_attributes = get_queue_attribute(target_queue_url)
# Set the Source Queue's Redrive policy
redrive_policy = {
'deadLetterTargetArn': target_queue_attributes['QueueArn'],
'maxReceiveCount': '1'
}
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '5',
'RedrivePolicy': json.dumps(redrive_policy)
}
)
get_queue_attribute(src_queue_url)
# read all messages
num_received = 0
while True:
try:
resp = sqs.receive_message(
QueueUrl=src_queue_url,
MaxNumberOfMessages=10,
AttributeNames=['All'],
WaitTimeSeconds=5)
num_message = len(resp.get('Messages', []))
if not num_message:
break
num_received += num_message
except Exception:
break
print(f"Redrive {num_received} messages")
# Reset the Source Queue's Redrive policy
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '30',
'RedrivePolicy': ''
}
)
get_queue_attribute(src_queue_url)
if __name__ == "__main__":
main()
还有另一种方法可以在不写任何代码的情况下实现此目标。假设您实际的队列名称是SQS_Queue,与之配对的DLQ为SQS_DLQ。现在请按照以下步骤进行操作:
aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10
。由于您可以读取的最大消息数限制为 10 条,因此建议像这样循环运行命令:for i in {1..1000}; do <CMD>; done
。 - Patrick Finnigan