我在这个问题上做了很多研究,但令我惊讶的是,在任何地方我都没有找到一个好的答案。
我在Heroku上运行一个大型应用程序,并且有一些长时间运行的celery任务,任务处理完成后会保存结果。每次我在Heroku上重新部署时,它都会发送SIGTERM(最终发送SIGKILL)并杀死我的正在运行的worker。 我正在寻找一种方法让worker实例优雅地关闭自己并重新排队以后进行处理,这样我们最终可以保存所需的结果,而不是丢失排队的任务。
我无法找到一种有效的方法使worker正确监听SIGTERM。到目前为止,我最接近的方式是直接运行python manage.py celeryd
时有效,但用foreman模拟Heroku时无效,代码如下:
@app.task(bind=True, max_retries=1)
def slow(self, x):
try:
for x in range(100):
print 'x: ' + unicode(x)
time.sleep(10)
except exceptions.MaxRetriesExceededError:
logger.error('whoa')
except (exceptions.WorkerShutdown, exceptions.WorkerTerminate) as exc:
logger.error(u'retrying, ' + unicode(exc))
raise self.retry(exc=exc, countdown=10)
except (KeyboardInterrupt, SystemExit) as exc:
print 'retrying'
raise self.retry(exc=exc, countdown=10)
else:
return x
finally:
logger.info('task ended!')
我在Foreman内启动celery任务,并按下Ctrl+C时,会出现以下情况:
^CSIGINT received
22:20:59 system | sending SIGTERM to all processes
22:20:59 web.1 | exited with code 0
22:21:04 system | sending SIGKILL to all processes
Killed: 9
很明显, celery 的任何异常、KeyboardInterrupt
或 SystemExit
异常(前面的帖子中提到过)都不能正确捕获 SIGTERM 并关闭 worker。
那么该如何正确地做呢?