可以使用任何unittest库同步测试任务。当我处理celery任务时,我通常会进行两个不同的测试会话。第一个(如下所示)是完全同步的,并应该确保算法执行其应该执行的操作。第二个会话使用整个系统(包括代理),并确保我没有序列化问题或任何其他分发、通信问题。
因此:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
并且你的测试:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
以下是我对七年前答案的更新:
通过 pytest fixture
,您可以在单独的线程中运行一个 worker:
https://docs.celeryq.dev/en/v5.2.6/userguide/testing.html#celery-worker-embed-live-worker
根据文档,您不应该使用 "always_eager"
(请参见上面链接页面的顶部)。
旧的答案:
我使用这个:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
文档:https://docs.celeryq.dev/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER
允许您同步运行任务,而无需使用celery服务器。
ImportError:找不到celeryconfig模块
。 - Daenythceleryconfig.py
存在于某个包中。请参阅http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#configuration。 - Kamil SindiCELERY_TASK_ALWAYS_EAGER
。 - krassowskiCELERY_TASK_ALWAYS_EAGER
?没有解释,我也看不出这个建议的逻辑。 - Ariel取决于您要测试的确切内容。
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
对于使用 Celery 4 的用户,需要进行如下操作:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
由于设置名称已更改,如果您选择升级,则需要更新这些名称,请参见
从Celery 3.0开始,在Django中设置CELERY_ALWAYS_EAGER
的一种方法是:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: gen93553@mymachine.local (running)>
assert myfunc.delay().wait(3)
在http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他固定装置中,您可以通过重新定义celery_config
装置来更改celery的默认选项:
@pytest.fixture(scope='session')
def celery_config():
return {
'accept_content': ['json', 'pickle'],
'result_serializer': 'pickle',
}
默认情况下,测试工作器使用内存代理和结果后端。如果不需要测试特定功能,则无需使用本地Redis或RabbitMQ。
参考文献,使用pytest。
def test_add(celery_worker):
mytask.delay()
如果您使用Flask,请设置应用程序配置
CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'cache+memory://'
并且在conftest.py
中
@pytest.fixture
def app():
yield app # Your actual Flask application
@pytest.fixture
def celery_app(app):
from celery.contrib.testing import tasks # need it
yield celery_app # Your actual Flask-Celery application
django.test
? - Eduardo Gomescelery==5.2.3
,这是唯一实际可行的方法,除此之外还需设置CELERY_SETTINGS = {"broker_url": "memory://", "result_backend": "cache+memory://", "task_always_eager": True}
。 - CaffeinatedMikeproj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
return a+b;
tests/test_tasks.py
from proj import add_task
def test_add():
assert add_task(1, 2) == 3, '1 + 2 should equal 3'
但是,由于shared_task
装饰器执行了许多Celery内部逻辑,它并不是真正的单元测试。
因此,对我来说,有两个选项:
选项1:分离内部逻辑
proj/tasks_logic.py
def internal_add(a, b):
return a + b;
proj/tasks.py
from .tasks_logic import internal_add
@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);
这看起来非常奇怪,除了使其不易读懂外,还需要手动提取和传递请求中的属性,例如如果需要task_id
,这会使逻辑不够纯粹。
选项2:模拟
模拟celery内部
tests/__init__.py
# noinspection PyUnresolvedReferences
from celery import shared_task
from mock import patch
def mock_signature(**kwargs):
return {}
def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func
return mocked_shared_decorator
patch('celery.shared_task', mocked_shared_task).start()
这样我就能够模拟请求对象(再次提醒,如果您需要从请求中获取内容,如id或重试计数器)。
tests/test_tasks.py
from proj import add_task
class MockedRequest:
def __init__(self, id=None):
self.id = id or 1
class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)
def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'
这个方案需要手动操作,但它给了我更多的控制,让我能够真正进行单元测试,而不需要反复重复,也不会失去celery的范围。
https://docs.celeryproject.org/en/stable/userguide/testing.html
对于使用单元测试的渴望模式,这里引用了实际文档中的一句话:
celery.loader.import_default_modules()
。 - FlaPer87task.appl().get()
和使用 eager 标志有什么区别/优势?谢谢。 - Jay