使用Amazon SQS和S3事件的Celery

5
我希望使用Celery来消费由Amazon在SQS上传递的S3事件。然而,S3消息格式Celery预期的格式不符。
我该如何最小化地处理这些消息?我应该编写自定义序列化器吗?还是应该放弃并使用boto或boto3创建自定义桥接?
值得一提的是,我还想将Celery连接到另一个代理(RabbitMQ)以进行应用程序消息传递,如果这很重要的话。

你所提到的Celery消息格式是用于Celery内部通信,以便向工作进程传达要运行的任务和要传递给它的参数。而你所提到的S3消息格式则是用于通知你有关影响S3对象的事件。直接将它们链接在一起是没有意义的——这就像询问是否可以将电线连接到水管以制作电热淋浴器一样。你需要中间的一些机制来进行一些处理。你想要实现什么?当收到S3事件通知时,创建一个系统(使用Celery工作进程)来执行一些工作吗? - scytale
@scytale 确切如此。愚人的任务? - w00t
不是。这是一个合理的问题。 - scytale
4个回答

7
您需要创建一个服务来监听S3通知,然后运行适当的Celery任务。
您有多种选择 - S3通知通过SQS、SNS或AWS Lambda发送。
事实上,最简单的选项可能是根本不使用Celery,而是编写一些代码在AWS Lambda中运行。我没有使用过这个服务(Lambda相对较新),但看起来它意味着您不必运行监控服务或Celery工作程序。

Lambda是我的第一个想法,但后来我开始担心必须处理在Lambda中运行的代码生命周期,以及如何获取RabbitMQ的身份验证令牌。 - w00t
这取决于任务运行的时间有多长,对吧?如果我没记错的话,Lambda 函数的执行时间限制是 15 分钟。 - DejanLekic

5

配置AWS S3事件以调用AWS Lambda函数。该函数应编写为将S3事件消息转换为Celery消息格式,然后将Celery消息发布到SQS。 Celery将从SQS中获取消息。

S3事件 -> Lambda -> SQS -> Celery


我尝试着处理这个结构,但是在参考了这里后使用的消息格式在kombu中会出现TypeError: dispatch() takes at least 2 arguments (1 given)的错误。你有实现过吗?感谢任何帮助。 - akhilsp
2
是的。这里有一个Python实现。https://gist.github.com/rhockenbury/74d176691a4d9e2a84f64cb314910fc6请注意,我使用了v1任务消息协议,但Kombu将检测到并解析为v2。http://docs.celeryproject.org/en/latest/internals/protocol.html - Ryler Hockenbury

3

针对我的具体用例,最简单的方法是创建一个桥接工人(worker),该工人定期轮询(SQS)并使用默认代理将任务交给Celery。

这并不难做到(尽管boto和SQS需要更多文档支持),而Celery不适合同时连接两个不同的代理,因此感觉这是实现的最佳方式。


1
亚马逊S3发送事件通知的消息是以JSON格式发送的。因此,您可以配置celery来序列化JSON。以下是我的配置文件中的摘录(使用django)。
# AWS Credentials
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')

# Celery
BROKER_URL = "sqs://%s:%s@" % (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = '<queue_name>'
CELERY_RESULT_BACKEND = None # Disabling the results backend

BROKER_TRANSPORT_OPTIONS = {
    'region': 'us-west-2',
    'polling_interval': 20,
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接