适当模拟被调用在另一个celery任务内部的celery任务

8
如何正确地模拟在另一个celery任务中调用的celery任务?(下面是虚拟代码)
@app.task
def task1(smthg):
    do_so_basic_stuff_1
    do_so_basic_stuff_2
    other_thing(smthg)

@app.task
def task2(smthg):
    if condition:
        task1.delay(smthg[1])
    else:
        task1.delay(smthg)

我有一个完全相同的代码结构在my_module.py中。位置在proj/cel/my_module.py。 我试着在proj/tests/cel_test/test.py里写测试。
测试函数:
def test_this_thing(self):
    # firs I want to mock task1
    # i've tried to import it from my_module.py to test.py and then mock it from test.py namespace 
    # i've tried to import it from my_module.py and mock it
    # nothing worked for me

    # what I basically want to do 
    # mock task1 here
    # and then run task 2 (synchronous)
    task2.apply()
    # and then I want to check if task one was called 
    self.assertTrue(mocked_task1.called)
2个回答

10

你没有调用task1()task2(),而是它们的方法:delay()apply() - 所以你需要测试这些方法是否被调用。

这是我根据你的代码编写的一个工作示例:

tasks.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def task1():
    return 'task1'

@app.task
def task2():
    task1.delay()

test.py

from tasks import task2

def test_task2(mocker):
    mocked_task1 = mocker.patch('tasks.task1')
    task2.apply()
    assert mocked_task1.delay.called

测试结果:

$ pytest -vvv test.py
============================= test session starts ==============================
platform linux -- Python 3.5.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /home/kris/.virtualenvs/3/bin/python3
cachedir: .cache
rootdir: /home/kris/projects/tmp, inifile:
plugins: mock-1.6.2, celery-4.1.0
collected 1 item                                                                

test.py::test_task2 PASSED

=========================== 1 passed in 0.02 seconds ===========================

1
为了进行更完整的测试,您还可以直接模拟 delayapply 方法。 - Titouan Freville

4
首先,测试Celery任务可能非常困难。我通常会将所有逻辑放入一个不是任务的函数中,然后创建一个只调用该函数的任务,以便您可以正确地测试逻辑。
其次,我认为您不想在任务内部调用任务(不确定,但我认为这通常不被推荐)。相反,根据您的需求,您应该进行链接或分组:
http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives
最后,回答您实际的问题,您需要在代码中确切发生delay方法的位置处打补丁,如此帖子所述。

2
只要你不等待另一个任务的结果(使用其结果),在其他任务中调用任务没有任何问题。 - zerohedge

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接