我有一个API,它返回一系列其他API。我需要每15分钟访问这些API,并将返回的数据放入数据库中。
以下是我在celery_worker.py文件中使用celery和redis编写的内容。但是所有任务都没有启动。
虽然celery服务器在终端中成功启动,但是
如果我将
所以我认为这必须与celery有关。
非常感谢。
以下是我在celery_worker.py文件中使用celery和redis编写的内容。但是所有任务都没有启动。
list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json()
CELERYBEAT_SCHEDULE = {
'every-15-minute': {
'task': 'fetch_data_of_all_APIs',
'schedule': timedelta(minutes=15),
},
}
@celery.task
def access_one_API(one_API):
return requests.get(one_API).json()
@celery.task(name='fetch_data_of_all_APIs')
def fetch_data_of_all_APIs():
for one_API in list_of_APIs:
task = access_one_API.delay(one_API)
# some codes to put all task.id into a list_of_task_id
for task_id in list_of_task_id:
# some codes to get the results of all tasks
# some codes to put all the results into a database
fetch_data_of_all_APIs
函数应该每15分钟运行一次,这个函数使用多个worker来运行access_one_API
函数。虽然celery服务器在终端中成功启动,但是
fetch_data_of_all_APIs
和access_one_API
都没有启动。如果我将
fetch_data_of_all_APIs
函数内的代码拿出来,access_one_API
就可以启动并由多个celery worker执行。但是,一旦我将这些代码放入一个函数中,并用@celery.task
装饰它,那么这两个函数都不会启动。所以我认为这必须与celery有关。
非常感谢。
@celery.task()
装饰器。此外,您需要检查celery-beat
配置参数,因为当前的Celery版本使用小写设置。 - Prashant Sinha