Celery定期任务执行其他Celery任务不起作用。

4
我有一个API,它返回一系列其他API。我需要每15分钟访问这些API,并将返回的数据放入数据库中。
以下是我在celery_worker.py文件中使用celery和redis编写的内容。但是所有任务都没有启动。
list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json()

CELERYBEAT_SCHEDULE = {
    'every-15-minute': {
        'task': 'fetch_data_of_all_APIs',
        'schedule': timedelta(minutes=15),
    },
}

@celery.task
def access_one_API(one_API):
    return requests.get(one_API).json()

@celery.task(name='fetch_data_of_all_APIs')
def fetch_data_of_all_APIs():
    for one_API in list_of_APIs:
          task = access_one_API.delay(one_API)
          # some codes to put all task.id into a list_of_task_id

    for task_id in list_of_task_id:
          # some codes to get the results of all tasks
          # some codes to put all the results into a database
fetch_data_of_all_APIs函数应该每15分钟运行一次,这个函数使用多个worker来运行access_one_API函数。
虽然celery服务器在终端中成功启动,但是fetch_data_of_all_APIsaccess_one_API都没有启动。
如果我将fetch_data_of_all_APIs函数内的代码拿出来,access_one_API就可以启动并由多个celery worker执行。但是,一旦我将这些代码放入一个函数中,并用@celery.task装饰它,那么这两个函数都不会启动。
所以我认为这必须与celery有关。
非常感谢。

请注意,您需要使用@celery.task()装饰器。此外,您需要检查celery-beat配置参数,因为当前的Celery版本使用小写设置。 - Prashant Sinha
1个回答

3

以下是使用celery配置周期任务及其子任务的示例(本示例将20秒作为演示时间)。tasks.py:

import celery
from celery.canvas import subtask
from celery.result import AsyncResult
# just for example list of integer values
list_of_APIs = [1, 2, 3, 4]


@celery.task(name='access_one_API')
def access_one_API(api):
    """
    Sum of subtask for demonstration
    :param int api:
    :return: int
    """
    return api + api


 @celery.task(name='fetch_data_of_all_APIs')
 def fetch_data_of_all_APIs(list_of_APIs):
    list_task_ids = []

    for api in list_of_APIs: 
        # run of celery subtask and collect id's of subtasks
        task_id = subtask('access_one_API', args=(api, )).apply_async().id
        list_task_ids.append(task_id)

    result_sub_tasks = {}

    for task_id in list_task_ids:
        while True:
            task_result = AsyncResult(task_id)
            if task_result.status == 'SUCCESS':
                # if subtask is finish add result and check result of next subtask
                result_sub_tasks[task_id] = task_result.result

                break

    print result_sub_tasks
    # do something with results of subtasks here...


app = celery.Celery(
   'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0',
)


app.conf.beat_schedule = {
    'add-every-20-seconds': {
        'task': 'fetch_data_of_all_APIs',
        'schedule': 20.0,
        # args for fetch_data_of_all_APIs
       'args': (list_of_APIs, )
    },
}

运行celery: celery worker -A tasks.app --loglevel=info --beat

从终端跟踪:

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2}

希望这有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接