将参数传递给自定义的celery任务

5

我想知道如何正确地向自定义任务装饰器传递参数。例如,我发现可以通过以下方式对celery任务进行子类化:

class MyTask(celery.Task):
    def __init__(self):
        # some custom stuff here
        super(MyTask, self).__init__()

    def __call__(self, *args, **kwargs):
        # do some custom stuff here
        res = self.run(*args, **kwargs)
        # do some more stuff here
        return res

并按以下方式使用:

@MyTask
def add(x, y):
    return x + y

但是我希望能够传递一个参数给这个任务,并根据该参数的不同(或等效地,基于它所装饰的函数的不同)而使其行为不同。我可以想到两种方法来实现这一点。其中一种是通过向celery任务包装器传递额外的kwarg并显式指定基础项来实现,如下所示:

@celery.task(base=MyTask, foo="bar")
def add(x, y):
    return x + y

我可以在我的自定义任务中通过self.foo访问它,但这让我感觉有点像作弊。另一种方法是检查self.task,并根据其值更改行为,但这似乎也有些过度。理想情况下,我希望直接将kwarg传递给自定义任务类,

@MyTask(foo="bar")
def add(x, y):
    return x + y

当然,这会创建一个MyTask实例,但我们既不想要它,也无法正常工作。
有关正确的做法有什么建议吗?

1
你也在尝试做同样的事情吗?你找到解决方案了吗? - nbeuchat
1个回答

1
您可以使用类成员而不是实例成员。因此,您可以在 MyTask 中在 __init__ 之外定义参数,如下所示。然后,您可以将此类用作Celery任务的基类,并将这些新类成员用作自定义任务的参数。
注意:不幸的是,您无法在 __init__ 中传递它们,因为您需要在装饰时实例化 MyTask
class MyTask(celery.Task):
    foo = "default"

    def __init__(self):
        # some custom stuff here
        super(MyTask, self).__init__()

    def __call__(self, *args, **kwargs):
        # do some custom stuff here
        print(self.foo)
        res = self.run(*args, **kwargs)
        # do some more stuff here
        return res

您可以使用以下代码:

@celery.task(base=MyTask, foo="bar")
def add(x, y):
    return x + y

这并没有达到我想要的目的。你已经可以通过 @celery.task(base=MyTask, foo="bar") 装饰器在基类中访问 "foo" 了。 - Shaun
@Shaun,您能具体说明一下“根据其值更改行为”的含义吗?在我的回答中,您可以在__call__函数中执行某些操作(甚至更改/删除/添加args或kwargs)的前面或后面。 - nbeuchat
顺便提一下,在 MyTask 中添加 foo = "default" 的想法只是为了在 MyTask 中拥有一个默认值。最重要的部分实际上是你在 __call__ 中对它的处理。 - nbeuchat
我概述了你在我的问题中发布的方法。但我正在寻找一种更直接的方式将其传递给自定义类。 - Shaun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接