在进程中安全运行代码,多线程中重定向stdout

8
我正在处理一个来自MOOC的数据集。我有很多Python3代码片段需要运行并获取结果。为此,我编写了一个Python脚本来循环执行每个代码片段。对于每个代码片段,我会:
  1. 创建新的StringIO对象
  2. sys.stdoutsys.stderr设置为我的StringIO缓冲区
  3. threading.thread对象中执行代码片段
  4. 等待线程结束
  5. 将结果记录在StringIO缓冲区中
  6. 恢复stdout和stderr
这对于“正确”的代码来说没问题,但在其他情况下会遇到问题:
  • 当代码有无限循环时,thread.join不能杀死线程。由于该线程是守护进程,因此它会在后台静默运行,直到我的循环完成。
  • 当代码有一个带有print()的无限循环时,当我将它从StringIO缓冲区切换回默认值(即不从StringIO缓冲区输出)时,该线程开始覆盖我的实际stdout。这会污染我的报告。
以下是我的当前代码:
def execCode(code, testScript=None):
    # create file-like string to capture output
    codeOut = io.StringIO()
    codeErr = io.StringIO()

    # capture output and errors
    sys.stdout = codeOut
    sys.stderr = codeErr

    def worker():
        exec(code, globals())

        if testScript:
            # flush stdout/stderror
            sys.stdout.truncate(0)
            sys.stdout.seek(0)
            # sys.stderr.truncate(0)
            # sys.stderr.seek(0)
            exec(testScript)

    thread = threading.Thread(target=worker, daemon=True)
    # thread = Process(target=worker) #, stdout=codeOut, stderr=codeErr)
    thread.start()
    thread.join(0.5)  # 500ms

    execError = codeErr.getvalue().strip()
    execOutput = codeOut.getvalue().strip()

    if thread.is_alive():
        thread.terminate()
        execError = "TimeError: run time exceeded"

    codeOut.close()
    codeErr.close()

    # restore stdout and stderr
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__

    # restore any overridden functions
    restoreBuiltinFunctions()

    if execError:
        return False, stripOuterException(execError)
    else:
        return True, execOutput

为了处理这种情况,我一直在尝试使用 multithreading.Process 和/或 contextlib.redirect_stdout 在进程中运行代码(然后我可以调用 process.terminate()),但是我没有成功捕获 stdout/stderr。
所以我的问题是:如何重定向或捕获进程的 stdout/stderr?另外,有什么其他方法可以尝试运行和捕获任意代码的输出?
(是的,我知道这一般来说是个坏主意;我是在虚拟机中运行它,以防万一里面有恶意代码)
Python 版本为 3.5.3
更新
我意识到在这种情况下还有一些小小的灵活性。我有一个函数,preprocess(code) 接受代码提交作为字符串并对其进行更改。大多数情况下,我一直在使用正则表达式交换某些变量的值。
以下是一个示例实现:
def preprocess(code):
    import re
    rx = re.compile('earlier_date\s*=\s*.+')
    code = re.sub(rx, "earlier_date = date(2016, 5, 3)", code)
    rx = re.compile('later_date\s*=\s*.+')
    code = re.sub(rx, "later_date = date(2016, 5, 24)", code)
    return code

我可以使用预处理函数帮助重定向STDOUT。

你考虑过“日志记录”吗? - igrinis
1
我有一些日志记录。我的当前解决方法是在遇到无限循环时打开日志记录。我使用日志记录来追踪并删除有问题的代码片段,但这是一个无法真正自动化的手动过程(如果我能自动化它,我就不需要记录任何东西,我可以中止、删除记录并继续)。 - Zack
1
subprocess.check_output 怎么样?你可以使用它来调用 python -c {snippet},或者如果代码比较长,可以将代码片段写入临时的 .py 文件中。check_output(以及 subprocess 中的所有其他函数)都有一个 timeout 参数。 - David Nemeskey
2个回答

3

在Python中,与正在运行的进程通信并不是一件容易的事情。由于某些原因,您只能在子进程的生命周期中执行一次。从我的经验来看,最好运行一个线程来启动进程,在超时后获取其输出并终止子进程。

像这样:

def subprocess_with_timeout(cmd, timeout_sec, stdin_data=None):
    """Execute `cmd` in a subprocess and enforce timeout `timeout_sec` seconds.

    Send `stdin_data` to the subprocess.

    Return subprocess exit code and outputs on natural completion of the subprocess.
    Raise an exception if timeout expires before subprocess completes."""
    proc = os.subprocess.Popen(cmd,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
    timer = threading.Timer(timeout_sec, proc.kill)
    # this will terminate subprocess after timeout
    timer.start()

    # you will be blocked here until process terminates (by itself or by timeout death switch)
    stdoutdata, stderrdata = proc.communicate(stdin_data) 

    if timer.is_alive():
        # Process completed naturally - cancel timer and return exit code
        timer.cancel()
        return proc.returncode, stdoutdata, stderrdata
    # Process killed by timer - raise exception
    raise TimeoutError('Process #%d killed after %f seconds' % (proc.pid, timeout_sec))

所以,运行一个线程执行器来调用 subprocess_with_timeout。它应该处理输入并保存结果。
另一个想法是使用Web服务器进行IPC。参见此链接

0

关于subprocess.check_output怎么样?你可以使用它来调用python -c {snippet},或者如果代码比较长,可以将代码片段写入临时的.py文件中。 check_output(以及subprocess中的所有其他函数)都有一个timeout参数。

总体思路如下:

import subprocess
import sys

def execCode(code):
    try:
        output = subprocess.check_output([sys.executable, '-c', code],
                                         stdin=subprocess.PIPE,
                                         stderr=subprocess.PIPE,
                                         timeout=0.5)
        return True, output
    except subprocess.TimeoutExpired as te:
        return False, 'run time exceeded'
    except subprocess.CalledProcessError as cpe:
        return False, cpe.stderr

示例在IPython中运行:

In [18]: execCode('import os\nprint(" ".join(os.listdir()))')
Out[18]:
(True,
 b'contents of directory\n')

In [19]: execCode('import time\ntime.sleep(1)')
Out[19]: (False, 'run time exceeded')

In [20]: execCode('import os\nprint("\t".join(os.listdi))')
Out[20]: 
(False,
 b'Traceback (most recent call last):\n  File "<string>", line 2, in <module>\nAttributeError: module \'os\' has no attribute \'listdi\'\n')

请注意,check_output返回一个bytes序列,因此您需要将其转换为str。或者您可以使用check_outputencoding参数。这与编程有关。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接