在特定任务后关闭Celery工作进程

29
我正在使用celery(使用concurrency=1的solo池),我希望能够在特定任务运行后关闭worker。但需要注意的是,我希望避免工作线程在完成该任务后接收到其他任务。
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    raise WorkerShutdown()

然而,当我运行工作线程时

celery -A celeryapp  worker --concurrency=1 --pool=solo

并运行该任务

add.delay(1,4)

我得到以下内容:

 -------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f596896ce90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

任务被重新排队,将在另一个工作进程上再次运行,从而形成循环。

当我把WorkerShutdown异常放在任务本身中时,也会发生这种情况。

@app.task
def add(x, y):
    print(x + y)
    raise WorkerShutdown()
有没有一种方法可以在特定任务完成后关闭worker,同时避免不良副作用?

尝试使用 os.kill(os.getpid(), signal.SIGTERM) 看看是否有帮助。请尝试您已经尝试过的两种方法。 - Tarun Lalwani
如果worker是celery进程的子进程,则应尝试使用 os.getppid() - Tarun Lalwani
这里的想法是工作进程在完成单个任务后不会重新启动吗? - MrName
是的,那就是想法。 - samfrances
@samfrances,您还没有对任何内容提供反馈或评论。请更新。 - Tarun Lalwani
3个回答

12

关闭worker的推荐方法是发送TERM信号。这将导致celery worker在完成当前运行的任务后关闭。如果您向worker的主进程发送QUIT信号,则worker将立即关闭。

然而,Celery文档通常讨论如何从命令行或通过systemd/initd管理Celery,但是Celery还提供了通过celery.app.control进行远程worker控制API。
您可以撤销一个任务以防止worker执行该任务。这应该可以防止您遇到的循环。此外,control还支持以这种方式关闭一个worker。

因此,我认为以下内容可以获得您所需的行为。

@app.task(bind=True)
def shutdown(self):
    app.control.revoke(self.id) # prevent this task from being executed again
    app.control.shutdown() # send shutdown signal to all workers

由于目前无法在任务内部确认任务,因此继续执行该任务,使用revoke方法就可以解决这个问题,以便即使任务再次排队,新的工作进程也将简单地忽略它。

或者,以下方法还可以防止重新发送的任务被执行两次...

@app.task(bind=True)
def some_task(self):
    if self.request.delivery_info['redelivered']:
        raise Ignore() # ignore if this task was redelivered
    print('This should only execute on first receipt of task')

值得注意的是,AsyncResult 还有一个 revoke 方法,它会为您调用 self.app.control.revoke


5

如果您关闭了工作进程,在任务完成后,它将不会再次排队。

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    app.control.broadcast('shutdown')

这将在任务完成后优雅地关闭工作进程。
[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors
[2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone
[2018-04-01 18:44:15,742: INFO/MainProcess] celery@foo ready.
[2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b]
[2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3   
[2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote

注意:广播会关闭所有的工作进程。如果你想关闭特定的工作进程,请使用名称启动该工作进程。
celery -A celeryapp  worker -n self_killing --concurrency=1 --pool=solo

现在您可以使用目标参数关闭此功能。
app.control.broadcast('shutdown', destination=['celery@self_killing'])

3

如果您需要关闭特定的工作程序,但事先不知道它的名称,则可以从任务属性中获取它。基于以上答案,您可以使用以下方法:

app.control.shutdown(destination=[self.request.hostname])

或者
app.control.broadcast('shutdown', destination=[self.request.hostname])

注意:

  • 一个worker应该使用名称(选项'-n')来启动;
  • 任务应该使用bind=True参数进行定义。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接