我有一个问题,我在Python中使用一个进程,必须等待X秒钟,进程本身能够正常工作,但是当我将它作为celery任务时就出现了问题。
当worker尝试在一个任务上执行time.sleep(X)时,它会暂停worker中的所有任务,例如:
我有Worker A,它可以同时执行4个任务(q,w,e和r),任务r需要睡眠1800秒,所以worker同时执行4个任务,但是当r任务睡眠时,worker也会停止q、w和e任务。
这是正常现象吗?你知道如何解决这个问题吗?
编辑: 这是带有我的beat和队列的celery.py示例。
当worker尝试在一个任务上执行time.sleep(X)时,它会暂停worker中的所有任务,例如:
我有Worker A,它可以同时执行4个任务(q,w,e和r),任务r需要睡眠1800秒,所以worker同时执行4个任务,但是当r任务睡眠时,worker也会停止q、w和e任务。
这是正常现象吗?你知道如何解决这个问题吗?
编辑: 这是带有我的beat和队列的celery.py示例。
app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
Queue('search', routing_key='search.#'),
Queue('tests', routing_key='tests.#'),
Queue('default', routing_key='tasks.#'),
),
CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
'tests.tasks.volume': {
'queue': 'tests',
'routing_key': 'tests.volume',
},
'tests.tasks.summary': {
'queue': 'tests',
'routing_key': 'tests.summary',
},
'search.tasks.links': {
'queue': 'search',
'routing_key': 'search.links',
},
'search.tasks.urls': {
'queue': 'search',
'routing_key': 'search.urls',
},
},
CELERYBEAT_SCHEDULE={
# heavy one
'each-hour-summary': {
'task': 'tests.tasks.summary',
'schedule': crontab(minute='0', hour='*/1'),
'args': (),
},
'each-hour-volume': {
'task': 'tests.tasks.volume',
'schedule': crontab(minute='0', hour='*/1'),
'args': (),
},
'links-each-cuarter': {
'task': 'search.tasks.links',
'schedule': crontab(minute='*/15'),
'args': (),
},
'urls-each-ten': {
'schedule': crontab(minute='*/10'),
'task': 'search.tasks.urls',
'args': (),
},
}
)
test.tasks.py
@app.task
def summary():
execute_sumary() #heavy task ~ 1 hour aprox
@app.task
def volume():
execute_volume() #no important ~ less than 5 minutes
以及 search.tasks.py
@app.task
def links():
free = search_links() #return boolean
if free:
process_links()
else:
time.sleep(1080) #<--------sleep with which I have problems
process_links()
@app.task
def urls():
execute_urls() #no important ~ less than 1 minute
嗯,我有两个工人,A负责队列搜索,B负责测试和默认任务。
问题出在A身上,当它执行“链接”任务并使用time.sleep()时,会停止工人正在执行的其他任务。
因为工人B正常工作,我认为问题在于time.sleep()函数。
links.retry(countdown=1080)
,除非您有一些需要保留的顺序。 - user2097159