如何对Celery任务进行单元测试?

152
11个回答

88

可以使用任何unittest库同步测试任务。当我处理celery任务时,我通常会进行两个不同的测试会话。第一个(如下所示)是完全同步的,并应该确保算法执行其应该执行的操作。第二个会话使用整个系统(包括代理),并确保我没有序列化问题或任何其他分发、通信问题。

因此:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

并且你的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

1
这个在除了使用 HttpDispatchTask 的任务上是有效的 - http://docs.celeryproject.org/en/latest/userguide/remote-tasks.html,在这种情况下,我必须设置 celery.conf.CELERY_ALWAYS_EAGER = True,但即使同时设置 celery.conf.CELERY_IMPORTS = ('celery.task.http'),测试仍然会失败,并显示 NotRegistered: celery.task.http.HttpDispatchTask。 - davidmytton
奇怪,你确定没有导入问题吗?这个test是有效的(请注意,我正在伪造响应,以便返回celery所期望的内容)。此外,在workers initialization期间将导入在CELERY_IMPORTS中定义的模块,为了避免这种情况,建议您调用celery.loader.import_default_modules() - FlaPer87
我建议你看一下这里:https://github.com/celery/celery/blob/master/celery/tests/tasks/test_http.py。它模拟了HTTP请求。不知道是否有帮助,我猜你想测试一个正在运行的服务,对吧? - FlaPer87
3
使用 task.appl().get() 和使用 eager 标志有什么区别/优势?谢谢。 - Jay

70

以下是我对七年前答案的更新:

通过 pytest fixture,您可以在单独的线程中运行一个 worker:

https://docs.celeryq.dev/en/v5.2.6/userguide/testing.html#celery-worker-embed-live-worker

根据文档,您不应该使用 "always_eager"(请参见上面链接页面的顶部)。


旧的答案:

我使用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
        ...

文档:https://docs.celeryq.dev/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER允许您同步运行任务,而无需使用celery服务器。


3
我认为这已经过时了 - 我得到了ImportError:找不到celeryconfig模块 - Daenyth
7
我认为上述假设了模块celeryconfig.py存在于某个包中。请参阅http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration。 - Kamil Sindi
1
我知道这很老,但你能提供一个完整的示例来启动OP问题中的“add”任务在一个“TestCase”类中吗? - Kruupös
1
@miken32 谢谢。由于最近的回答已经解决了我想帮忙的问题,我只是留了一个评论,指出官方文档不鼓励在单元测试中使用 CELERY_TASK_ALWAYS_EAGER - krassowski
1
为什么文档不建议在测试中使用CELERY_TASK_ALWAYS_EAGER?没有解释,我也看不出这个建议的逻辑。 - Ariel
显示剩余4条评论

42

取决于您要测试的确切内容。

  • 直接测试任务代码。在单元测试中,不要调用"task.delay(...)",而是直接调用"task(...)"。
  • 使用CELERY_ALWAYS_EAGER。这将导致您的任务立即被调用,在您说"task.delay(...)"的那一点进行测试,因此您可以测试整个路径(但不包括任何异步行为)。

39

单元测试

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

附加说明:使send_task遵守eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

38

14
根据官方文档,在单元测试中使用"task_always_eager"(早期版本为"CELERY_ALWAYS_EAGER")是不合适的。相反,他们提出了其他很好的方法来对你的 Celery 应用进行单元测试。 - krassowski
6
我想强调一下为什么在单元测试中不希望使用急切任务,因为这样你就无法测试例如参数序列化的操作,而这些操作会在代码投入生产时发生。 - damd

19

Celery 3.0开始,在Django中设置CELERY_ALWAYS_EAGER的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

当Celery任务在函数内部时,似乎它不起作用。 - Sandeep Balagopal

13
自 Celery v4.0 版本以来,py.test fixtures 提供了启动 celery worker 进行测试,并在完成后关闭的功能:provided
def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@mymachine.local (running)>
    assert myfunc.delay().wait(3)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他固定装置中,您可以通过重新定义celery_config装置来更改celery的默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

默认情况下,测试工作器使用内存代理和结果后端。如果不需要测试特定功能,则无需使用本地Redis或RabbitMQ。


亲爱的点踩者,您能分享一下为什么这个回答不好吗?非常感谢。 - alanjds
2
对我没用,测试套件一直卡住了。你能提供更多的上下文吗?(虽然我还没有投票;)) - duality_
1
在我的情况下,我必须明确设置celey_config fixture以使用内存代理和缓存+内存后端。 - sanzoghenzo

12

参考文献,使用pytest。

def test_add(celery_worker):
    mytask.delay()

如果您使用Flask,请设置应用程序配置

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

并且在conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

这个应用程序配置的内存和缓存是否也适用于 django.test - Eduardo Gomes
我在互联网上阅读了很多内容,但只有这个东西为我提供了一些启示,让我能够使用Unittest库调用一系列任务。我还使用了task_always_eager(我知道这不是最好的选择),但我只想测试链式错误处理。 - Gabor
1
2022年使用celery==5.2.3,这是唯一实际可行的方法,除此之外还需设置CELERY_SETTINGS = {"broker_url": "memory://", "result_backend": "cache+memory://", "task_always_eager": True} - CaffeinatedMike

6
在我的情况下(我想很多人也是这样),我只是想使用pytest测试任务的内部逻辑。 简而言之,最终将所有内容模拟掉了(选项2)。
示例用例proj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

但是,由于shared_task装饰器执行了许多Celery内部逻辑,它并不是真正的单元测试。

因此,对我来说,有两个选项:

选项1:分离内部逻辑

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

这看起来非常奇怪,除了使其不易读懂外,还需要手动提取和传递请求中的属性,例如如果需要task_id,这会使逻辑不够纯粹。

选项2:模拟
模拟celery内部

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

这样我就能够模拟请求对象(再次提醒,如果您需要从请求中获取内容,如id或重试计数器)。

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

这个方案需要手动操作,但它给了我更多的控制,让我能够真正进行单元测试,而不需要反复重复,也不会失去celery的范围。


3
我看到许多单元测试方法中都有“CELERY_ALWAYS_EAGER = true”作为解决方案,但自从版本5.0.5发布以来,有很多变化使得大部分旧答案都已过时,对我来说是一种浪费时间的废话。因此,对于在这里寻找解决方案的每个人,请查阅文档并阅读新版本的良好文档化的单元测试示例。

https://docs.celeryproject.org/en/stable/userguide/testing.html

对于使用单元测试的渴望模式,这里引用了实际文档中的一句话:
“渴望模式启用的任务(通过task_always_eager设置)因其定义而不适合进行单元测试。在使用渴望模式进行测试时,您只是在测试工作程序的仿真,而仿真与实际情况之间存在许多差异。”

1
文档似乎只针对pytest,而不是django默认的unittest。如果他们有一些使用标准django测试设置的示例,那就太棒了。 - sur.la.route

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接