Celery工作者休眠不正确

7
我有一个问题,我在Python中使用一个进程,必须等待X秒钟,进程本身能够正常工作,但是当我将它作为celery任务时就出现了问题。
当worker尝试在一个任务上执行time.sleep(X)时,它会暂停worker中的所有任务,例如:
我有Worker A,它可以同时执行4个任务(q,w,e和r),任务r需要睡眠1800秒,所以worker同时执行4个任务,但是当r任务睡眠时,worker也会停止q、w和e任务。
这是正常现象吗?你知道如何解决这个问题吗?
编辑: 这是带有我的beat和队列的celery.py示例。
app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
    Queue('search', routing_key='search.#'),
    Queue('tests', routing_key='tests.#'),
    Queue('default',    routing_key='tasks.#'),
),

CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
    'tests.tasks.volume': {
        'queue': 'tests',
        'routing_key': 'tests.volume',
    },
    'tests.tasks.summary': {
        'queue': 'tests',
        'routing_key': 'tests.summary',
    },
    'search.tasks.links': {
        'queue': 'search',
        'routing_key': 'search.links',
    },
    'search.tasks.urls': {
        'queue': 'search',
        'routing_key': 'search.urls',
    },
},

CELERYBEAT_SCHEDULE={
    # heavy one
    'each-hour-summary': {
        'task': 'tests.tasks.summary',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'each-hour-volume': {
        'task': 'tests.tasks.volume',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'links-each-cuarter': {
        'task': 'search.tasks.links',
        'schedule': crontab(minute='*/15'),
        'args': (),
    },
    'urls-each-ten': {
        'schedule': crontab(minute='*/10'),
        'task': 'search.tasks.urls',
        'args': (),
    },
}
)

test.tasks.py

@app.task
def summary():
    execute_sumary() #heavy task ~ 1 hour aprox

@app.task
def volume():
    execute_volume() #no important ~ less than 5 minutes

以及 search.tasks.py

@app.task
def links():
    free = search_links() #return boolean
    if free:
        process_links()
    else:
        time.sleep(1080) #<--------sleep with which I have problems
    process_links()

@app.task
def urls():
    execute_urls() #no important ~ less than 1 minute

嗯,我有两个工人,A负责队列搜索,B负责测试和默认任务。

问题出在A身上,当它执行“链接”任务并使用time.sleep()时,会停止工人正在执行的其他任务。

因为工人B正常工作,我认为问题在于time.sleep()函数。


你能提供一个简单的例子或者你的代码,以便更好地理解你的问题吗? - Michele d'Amico
我把我遇到问题的代码部分放在了这里。 - Luis Hernandez Donadeu
我不确定为什么会发生这种情况,但是为了解决您的问题,我建议使用 links.retry(countdown=1080),除非您有一些需要保留的顺序。 - user2097159
我尝试使用retry(),但问题在于我必须等待搜索链接()完成后,我不能在此之后取消并重试,因为在等待期间信息可能会发生变化,而我需要原始信息,该信息保存在本地,如果我取消任务,我将丢失原始信息。 - Luis Hernandez Donadeu
2个回答

5
如果您只有一个进程/线程,调用sleep()将会阻塞它。这意味着没有其他任务将运行...

1
您设置了CELERYD_TASK_SOFT_TIME_LIMIT=1800,但您的睡眠时间是1080。在此时间间隔内只能工作一个或两个任务。当启动celery worker时,请将CELERYD_TASK_SOFT_TIME_LIMIT设置为> (1080 +(工作时间))* 3,并设置更多的--concurency(> 4)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接