Celery工作者队列中排队的任务数量限制

4

我有一个最简单的Celery任务示例(case.py):

import time
import celery


app = celery.Celery('case', broker='redis://localhost')

@app.task
def do_sth():
    time.sleep(5)

我创建了几个任务实例:

>>> import case
>>> tasks_list = [case.do_sth.delay() for i in range(4)]

我希望可以有动态数量的工作进程,使用--concurrency=1参数。对于我的情况来说,每个工作进程只能有一个并行任务,并且当一些任务在服务器上排队时,需要添加工作进程。如果某些任务没有它们自己的工作进程,而新的工作进程被添加,则新的工作进程应该处理排队的(等待但未执行的)任务。但是,当我调用$ celery -A case worker --loglevel=info --concurrency=1时,我得到以下日志(不包括Celery标志):

[tasks]
  . case.do_sth

[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:19,480: INFO/MainProcess] celery@pt0 ready.
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c]
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b]
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803]
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d]
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None

同时,我启动了另一个工作进程(使用相同的命令),但它没有做任何事情:

[tasks]
  . case.do_sth

[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:21,352: INFO/MainProcess] celery@pt0 ready.

如果你检查日期,你会发现第二个工作人员在第一个工作人员触发后不到五秒钟就被触发了。

有没有一种方法(某些工作人员或Celery选项)可以将排队在单个工作人员上的任务限制为恰好一个任务?

1个回答

4

我认为你正在寻找的是预取限制(prefetch limit)。它有效地减少了工作进程可以在给定时间内保留的任务数。

预取限制(prefetch limit)是一个限制,用于控制工作进程可以预取的任务(消息)数量。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接