我有一个最简单的Celery任务示例(case.py
):
import time
import celery
app = celery.Celery('case', broker='redis://localhost')
@app.task
def do_sth():
time.sleep(5)
我创建了几个任务实例:
>>> import case
>>> tasks_list = [case.do_sth.delay() for i in range(4)]
我希望可以有动态数量的工作进程,使用--concurrency=1
参数。对于我的情况来说,每个工作进程只能有一个并行任务,并且当一些任务在服务器上排队时,需要添加工作进程。如果某些任务没有它们自己的工作进程,而新的工作进程被添加,则新的工作进程应该处理排队的(等待但未执行的)任务。但是,当我调用$ celery -A case worker --loglevel=info --concurrency=1
时,我得到以下日志(不包括Celery标志):
[tasks]
. case.do_sth
[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:19,480: INFO/MainProcess] celery@pt0 ready.
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c]
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b]
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803]
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d]
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None
同时,我启动了另一个工作进程(使用相同的命令),但它没有做任何事情:
[tasks]
. case.do_sth
[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:21,352: INFO/MainProcess] celery@pt0 ready.
如果你检查日期,你会发现第二个工作人员在第一个工作人员触发后不到五秒钟就被触发了。
有没有一种方法(某些工作人员或Celery选项)可以将排队在单个工作人员上的任务限制为恰好一个任务?