AWS SQS能否发布到SNS,还是必须轮询SQS?

7
我们目前正在使用AWS构建一个应用程序,需要将消息推送到SQS。我的问题是,是否可以让SQS发布一条消息到SNS,从而触发Lambda(订阅SNS)?然后Lambda需要向SQS返回确认收到消息的信息,从而将该消息从SQS中删除。
上述情况是否可行?还是唯一的方法是通过Lambda轮询队列等方式从SQS获取消息?
非常感谢提供的任何帮助。
对于术语的误用,我表示歉意,因为我对AWS相对较新。

我已经发布了一个答案,告诉大家在200次重新发明更新后现在这是可能的,如果它对你有帮助,请点赞或接受它,以便将来能够帮助其他人。 - Jatin Mehrotra
4个回答

9

SQS无法发布消息到SNS。SQS只能存储消息。您需要使用SQS API拉取消息。

希望这能帮到您!


8

[ 2022/2023 更新 ]

现在可以使用EventBridgePipes.实现此功能。

我刚刚尝试了使用SQS作为源和SNS作为目标,它可以正常工作。


4

-1

对于未来遇到类似问题的人,我在将SQS用作SNS -> Lambda服务的死信后编写了此脚本。虽然代码不是最漂亮的,但它能正常工作。

以下是脚本内容,您也可以在这里找到:https://gist.github.com/joshghent/ca4a1272031e2a52af57d5e8ec5d53c5

sqs_to_sns.py

# Usage
# $ python sqs_to_sns.py my-queue-name

import boto3
import sys
import queue
import threading
from datetime import datetime
import json
from uuid import uuid4

work_queue = queue.Queue()

sqs = boto3.resource('sqs')
sns = boto3.client('sns')
sqs_client = boto3.client('sqs')

from_q_name = sys.argv[1]
print(("From: " + from_q_name + " To: SNS"))

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName='backup-queue')

skipped = 0
processed = 0
total = 0


def process_queue():
    while True:
        messages = work_queue.get()

        global total
        total = len(messages)

        for message in messages:
            message_content = json.loads(message.body)

            message.delete()

            print("Backing up Message to dead letter queue - just in case. Id: " +
                  message_content['MessageId'])
            bodies = list()
            bodies.append({'Id': str(uuid4()), 'MessageBody': message.body})

            to_q.send_messages(Entries=bodies)

            response = sns.publish(
                TopicArn=message_content['TopicArn'], Message=message_content['Message'])
            print(("Published Message to Topic " + str(message_content['MessageId']) +
                   ". To TopicArn: " + message_content['TopicArn'] + ". Received response " + json.dumps(response)))

            global processed
            processed = processed + 1


for i in range(10):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
print("Processed " + str(processed) + " of " + str(total) +
      ". Skipped " + str(skipped) + " messages. Exiting")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接