具有超时功能的异步子进程

6
我在Python 3中使用异步子进程生成超时存在问题。
我的目标是:我想异步生成多个进程,而不等待结果,但我也想确保每个生成的进程都会在给定的超时时间内结束。
我在这里找到了类似的问题:Using module 'subprocess' with timeoutAsynchronous background processes in Python? ,但它们不能解决我的问题。
我的代码如下所示。我有一个命令类,如使用模块'subprocess' with timeout中建议的那样:
class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.process.communicate()
      print('Thread finished')

    thread = threading.Thread(target=target)
    thread.start()

    thread.join(timeout)
    if thread.is_alive():
      print('Terminating process')
      self.process.terminate()
      thread.join()

然后当我想要生成子进程时:

for system in systems:
  for service in to_spawn_system_info:
    command_str = "cd {0} && python proc_ip.py {1} {2} 0 2>>{3}".format(home_dir,
        service, system, service_log_dir)
    command = Command(command_str)
    command.run(timeout=60)

当我运行这个命令时,输出似乎等待每个命令的生成和结束。我得到:
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished
Thread started
Thread finished

我的问题是我做错了什么?现在我开始怀疑是否可能生成一个进程并通过超时来限制其执行。

为什么我需要这个?生成器脚本将在cron中运行。它将每10分钟执行一次,并且必须生成约20个子进程。我希望保证每个子进程都会在下一次从cron运行脚本之前结束。

3个回答

2

如之前所述,调用process.communicate()会使您的代码等待子进程完成。但是,如果您只删除communicate()调用,则线程将在生成进程后立即退出,导致您的thread.join()调用过早退出,并且您将过早地终止子进程。为了避免轮询或忙等待,您可以设置一个定时器,在超时后如果进程尚未完成,则杀死进程(和运行线程):

class Command(object):
  def __init__(self, cmd):
    self.cmd = cmd
    self.process = None

  def run(self, timeout):
    def target():
      print('Thread started')
      # May want/need to skip the shlex.split() when using shell=True
      # See Popen() constructor docs on 'shell' argument for more detail.
      args = shlex.split(self.cmd)
      self.process = subprocess.Popen(args, shell=True)
      self.timer.start()
      self.process.wait()
      self.timer.cancel()

    def timer_callback():
        print('Terminating process (timed out)')
        self.process.terminate()

    thread = threading.Thread(target=target)
    self.timer = threading.Timer(timeout, timer_callback)
    thread.start()

当我尝试这个解决方案时,它没有在超时后终止我的线程。我将超时设置为1秒,并在目标函数中添加了time.sleep(1)。没有线程被终止。 - sebast26
嗯,当target()退出时,线程应该终止。请记住,如上所述,如果进程正常退出而没有超时,则不会得到打印输出。我会仔细检查一下,可能是我忽略了什么。 - mshildt
1
unutbu 是正确的,但是我在线程目标中添加了 self.process.wait() 以使线程等待子进程完成,这样线程在子进程完成之前不会退出。然而,我的解决方案有些问题。我正在尝试解决它,但似乎无法正确运行进程...仍在研究中。 - mshildt
如果您的计时器在线程目标中的subprocess.Popen()调用之前执行,则self.process仍将为None。我在上面的代码中将timer.start()调用移至thread.start()调用下方,以避免这种情况。这样修复了吗? - mshildt
好的,我刚刚更新了代码,通过将计时器启动移动到线程目标中,修复/避免了任何奇怪的竞争条件。现在一旦进入计时器回调,self.process就不应该为None。 - mshildt
显示剩余5条评论

1

使用可以独立启动和结束的线程。如果您事先知道要运行的所有命令,则此方法将非常有用。以下是一个示例...

from threading import Thread
import subprocess
import Queue
import multiprocessing

class Command(object):
    def __init__(self, cmds):
        self.cmds = cmds

    def run_cmds(self):
        cmd_queue = Queue.Queue()
        for cmd in self.cmds:
            cmd_queue.put(cmd)

        available_threads = multiprocessing.cpu_count()
        for x in range(0,available_threads):
            t = Thread(target=self.run_cmd,args=(cmd_queue,))
            t.setDaemon(True)
            t.start()

        cmd_queue.join()


    def run_cmd(self, cmd_queue):
        while True:
            try: cmd = cmd_queue.get()
            except: break
            print 'Thread started'
            process = subprocess.Popen(cmd, shell=True)
            process.communicate()
            print 'Thread finished'
            cmd_queue.task_done()


# create list of commands you want to run
cmds = ['cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop','cd /home/nater/Desktop']
# create class
c = Command(cmds)
# run them...
c.run_cmds()

这将打印....
Thread started
Thread started
 Thread started
 Thread startedThread finished

Thread started
Thread finishedThread finished

Thread finished
Thread finished

从输出中可以看出,子进程独立地启动和结束,并且没有任何一个子进程等待另一个子进程完成,因为它们都在不同的线程中调用。当然,您可以添加超时和其他任何您想要的内容,这只是一个简单的示例。这假设您知道要运行的所有命令。如果您想添加线程超时,请参见epicbrews的答案。如果您想要,您可以将他的线程超时示例合并到此示例中。

就像我在示例中所做的那样? :P 尽管我没有像你描述得那么清楚。 - Torxed
实际上,在我回答时,您的示例中有process.communicate()。否则我不会回答的。我在编辑历史记录中看到您已将其删除。 - b10hazard
是的,但在你发帖之前我就尽快将其删除了,因为我只是将他的代码粘贴到那里以便在我的连接断开之前(在火车上,所以每2分钟就会断开)将其放在那里。 :) - Torxed
啊,我明白了。我最终还是删除了那个建议。这似乎会与 OP 所尝试的目标背道而驰。 - b10hazard

0
from threading import *
from time import time
import shlex
import subprocess
from random import randint
class Worker(Thread):
    def __init__(self, param, cmd, timeout=10):
        self.cmd = cmd
        self.timeout = timeout

        Thread.__init__(self)
        self.name = param
    def run(self):
        startup = time()
        print(self.name + ' is starting')

        args = shlex.split(self.cmd)
        #Shell should be false when given a list (True for strings)
        process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

        while time()-startup <= self.timeout:
            if process.poll() != None:
                break

        process.stdout.close()
        process.stdin.close()
        process.stderr.close()

        print(self.name + ' is dead')

for i in range(0, 100):
    x = Worker('Name-'+str(i), 'ping -n ' + str(randint(0,5)) + ' www.google.se')
    x.start()

while len(enumerate()) > 1:
    pass # Wait for the threads to die

这会简化你的工作方法吗? 特别是考虑到你不需要等待结果,这只是将一个类对象启动到外部空间为你执行工作,并设置超时时间。

还要注意:

  • 不关闭stdout、stdin和stderr会导致几乎所有系统上出现“打开太多文件句柄”的错误
  • 正如另一个答案中指出的那样,.communicate()会等待进程退出(使用.poll()代替)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接