使用类方法作为Celery任务

57

我正在尝试将类的方法用作django-celery任务,并使用@task装饰器进行标记。与Anand Jeyahar提出的问题here描述的情况相同。

class A:
    @task
    def foo(self, bar):
        ...

def main():
    a = A()
    ...
    # what i need
    a.foo.delay(bar) # executes as celery task 
    a.foo(bar) # executes locally
问题在于即使我像这样使用类实例 a.foo.delay(bar),它仍然会提示需要至少两个参数,这意味着缺少self指针。

更多信息:

  • 我无法将类转换为模块因为存在继承关系。
  • 方法强烈依赖于类成员,所以我不能将它们设置为静态
  • 使用@task修饰符将标记为任务会使类本身成为任务,并且可以从run()方法中执行方法,使用某些参数作为方法选择的键,但这不是我想要的。
  • 创建一个类的实例并将其作为self参数传递给方法会更改我执行方法的方式,而不是作为celery任务,而是作为普通方法(即在测试时)。
  • 我尝试了解如何在构造函数中动态注册任务,但是Celery在工作进程之间共享代码,因此这似乎是不可能的。

感谢您的帮助!


你如何执行它?相同的示例对我也有效。 - asksol
a = A() a.method(1,2) 或 a.method.delay(1,2) -- 结果相同 - Thorin Schiffer
5个回答

55

希望在未来的版本中能够看到这个! - ThiefMaster
1
@ThiefMaster,这实际上已经在Celery 3.0中了,可以看看celery.contrib.methods:http://docs.celeryproject.org/en/latest/reference/celery.contrib.methods.html - asksol
啊,很好,考虑更新你的答案以包含这个。 - ThiefMaster
12
请注意:这似乎是因为它太过于有错而无用于 2014 年 10 月被删除 - https://github.com/celery/celery/commit/4f43276c236bbef7239a49b93815f478aec1d9f6。 - Hamy

11

如果您想要编写基于类的任务,Jeremy Satterfield提供了简洁明了的教程。您可以在这里查看。

关键是扩展celery.Task类并包括一个run()方法,就像下面这样:

from celery import Task

class CustomTask(Task):
    ignore_result = True

    def __init__(self, arg):
        self.arg = arg

    def run(self):
        do_something_with_arg(self.arg)

然后像这样运行任务:

your_arg = 3

custom_task = CustomTask()
custom_task.delay(your_arg)

我不确定是否需要ignore_result = True这部分。


我尝试了这个,但它没有起作用,它抛出了这个错误 - AttributeError:FileTask实例没有'delay'属性。 - Sheesh Mohsin
1
不应该在基于类的任务中覆盖init。 @SheeshMohsin请删除init并直接在run函数中获取arg def run(self, arg): 对arg执行某些操作 - Patlola Praveen
我认为你仍然需要明确地将任务注册到Celery中。 - undefined

5

当你拥有以下这些:

    a = A()

您可以做:

    A.foo.delay(a, param0, .., paramN)

干杯


如何使用apply_async完成相同的操作,以便将语法放在一个地方? - Asif Ali

2

我遇到了类似的情况,决定将类方法封装在一个简单的函数中,该函数将其参数重定向到类的实例并执行这些方法:

class A:
    def foo(self, bar):
       # do this

a = A()

@app.task
def a_wrapper(bar):
    return a.foo(bar)

# probably in a different size with an import in-place:

a_wrapper.delay(bar)

1

对我来说,唯一有效的是 celery.current_app,因为只有这个方法会将 self 传递给函数。

所以应该像这样:

from celery import current_app
from celery.contrib.methods import task_method

class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
    ...

如果在不同的类中有同名方法,则必须使用名称。


1
正如@asksol所提到的,自Celery 4版本以来,此功能已被弃用。请参考http://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html#unscheduled-removals。 - Philipp Zettl

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接