Django,FastCGI:如何管理长时间运行的进程?

8
我继承了一个django+fastcgi应用程序,需要修改以执行长时间的计算(长达半个小时或更长时间)。我想做的是在后台运行计算,并返回“您的作业已启动”类型的响应。当进程正在运行时,对url的进一步访问应返回“您的作业仍在运行”,直到作业完成,此时应返回作业的结果。对url的任何后续访问都应返回缓存的结果。
我是django的新手,十年来没有进行过任何重要的网络工作,因此我不知道是否有内置的方法可以实现我想要的功能。我尝试使用subprocess.Popen()启动进程,这很好,除了它在进程表中留下了一个defunct条目。我需要一个干净的解决方案,可以在进程完成后删除临时文件和任何进程的痕迹。
我还尝试过fork()和线程,但尚未找到可行的解决方案。是否有一个权威的解决方案,似乎这对我来说是一个非常常见的用例?FWIW,这将仅在具有非常低流量的内部服务器上使用。

请提供你正在使用的生成后台处理的代码。有很多种方法可以做到这一点,你使用的是哪一种? - S.Lott
2个回答

4

我现在需要解决一个类似的问题。这不是一个公共网站,而是一个内部流量较低的服务器。

技术限制:

  • 所有输入到长时间运行的进程的数据都可以在其启动时提供
  • 长时间运行的进程不需要用户交互(除了开始进程的初始输入)
  • 计算时间足够长,以至于结果无法在立即的HTTP响应中返回给客户端
  • 需要某种反馈(类似进度条)来自长时间运行的进程。

因此,我们至少需要两个web“视图”:一个用于启动长时间运行的进程,另一个用于监视其状态/收集结果。

我们还需要某种进程间通信:将用户数据从发起者(http请求上的web服务器)发送到长时间运行的进程,然后将其结果发送到接收者(再次通过http请求驱动的web服务器)。前者很容易,后者则不太明显。与正常的unix编程不同,接收者最初未知。接收者可能是与初始化程序不同的进程,并且它可能在长时间运行的任务仍在进行或已经完成时启动。因此,管道不起作用,我们需要某种长时间运行进程结果的永久性。

我看到了两种可能的解决方案:

  • 将长时间运行的进程发起请求传递给长时间运行的作业管理器(这可能就是上述django-queue-service所做的);
  • 将结果永久保存在文件或DB中。

我更喜欢使用临时文件并记住它们在会话数据中的位置。我认为这不能变得更简单。

一个作业脚本(这是长时间运行的进程),myjob.py:

import sys
from time import sleep

i = 0
while i < 1000:
    print 'myjob:', i  
    i=i+1
    sleep(0.1)
    sys.stdout.flush()

Django中的urls.py映射:

urlpatterns = patterns('',
(r'^startjob/$', 'mysite.myapp.views.startjob'),
(r'^showjob/$',  'mysite.myapp.views.showjob'),
(r'^rmjob/$',    'mysite.myapp.views.rmjob'),
)

Django视图:

from tempfile import mkstemp
from os import fdopen,unlink,kill
from subprocess import Popen
import signal

def startjob(request):
     """Start a new long running process unless already started."""
     if not request.session.has_key('job'):
          # create a temporary file to save the resuls
          outfd,outname=mkstemp()
          request.session['jobfile']=outname
          outfile=fdopen(outfd,'a+')
          proc=Popen("python myjob.py",shell=True,stdout=outfile)
          # remember pid to terminate the job later
          request.session['job']=proc.pid
     return HttpResponse('A <a href="/showjob/">new job</a> has started.')

def showjob(request):
     """Show the last result of the running job."""
     if not request.session.has_key('job'):
          return HttpResponse('Not running a job.'+\
               '<a href="/startjob/">Start a new one?</a>')
     else:
          filename=request.session['jobfile']
          results=open(filename)
          lines=results.readlines()
          try:
               return HttpResponse(lines[-1]+\
                         '<p><a href="/rmjob/">Terminate?</a>')
          except:
               return HttpResponse('No results yet.'+\
                         '<p><a href="/rmjob/">Terminate?</a>')
     return response

def rmjob(request):
     """Terminate the runining job."""
     if request.session.has_key('job'):
          job=request.session['job']
          filename=request.session['jobfile']
          try:
               kill(job,signal.SIGKILL) # unix only
               unlink(filename)
          except OSError, e:
               pass # probably the job has finished already
          del request.session['job']
          del request.session['jobfile']
     return HttpResponseRedirect('/startjob/') # start a new one

3
也许你可以从另一个角度来看待这个问题。
也许你可以尝试使用DjangoQueueService,并有一个“守护进程”监听队列,查看是否有新的内容并进行处理。

这绝对接近我所寻找的。我之前就遇到过这个问题,但我希望能找到一种不需要添加任何其他依赖项的解决方案。谢谢。 - Kevin Dente
你可以自己构建一个队列系统。我的意思是,这并不难做。 - changelog
作为 Django Queue Service 的创建者,我想说的是,最好考虑 celery 或其中一个排队服务。当时它是一个不错的 hack,但现在已经远远被超越了。 - heckj

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接