用Python进行基本终端仿真

4

我一直在尝试编写一个基本的终端仿真脚本,因为我在我的Mac上无法使用终端。但是,在Blender中编写游戏引擎脚本时,通常在启动Blender的终端中打开的控制台至关重要。

对于简单的操作,如删除、重命名等,我过去使用stream = os.popen(command)执行命令,然后使用print(stream.read())输出结果。这对大多数事情都有效,但对于交互式操作则不行。

不久前,我发现了一种新的方法:sp = subprocess.Popen(["/bin/bash", "-i"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE),然后使用print(sp.communicate(command.encode()))。这应该会生成一个类似终端的交互式shell,对吗?

但无论如何,我无法保持连接处于打开状态,使用最后一个示例,我只能调用sp.communicate一次,给出以下输出(在这种情况下为'ls /')和一些错误:
(b'Applications\n[...]usr\nvar\n', b'bash: no job control in this shell\nbash-3.2$ ls /\nbash-3.2$ exit\n')。第二次调用将出现ValueError: I/O operation on closed file.有时(例如对于'ls'),我只会收到这个错误:b'ls\nbash-3.2$ exit\n'

那是什么意思?如何使用Python模拟终端,以便控制交互式shell或运行Blender并与控制台通信?


Blender不能允许您从运行的进程中打开自己的终端窗口吗? - JAB
据我所知,在Mac上,如果你需要控制台,你需要直接打开可执行文件来启动Blender。默认情况下,它会启动终端。但是,无论如何,我都无法打开任何终端窗口,因为我的Mac已激活了家长控制。但我很确定,在Blender中编写脚本不是管理员想限制的内容。 - lucaba
1
您可能可以使用或获取适用于Mac的Terminal.app。 - user1277476
3个回答

10

假设您想要一个交互式shell,它会持续请求输入,您可以尝试以下内容:

import subprocess
import re

while True:
    # prevents lots of python error output
    try:
        s = raw_input('> ')
    except:
        break

    # check if you should exit
    if s.strip().lower() == 'exit':
        break

    # try to run command
    try:
        cmd = subprocess.Popen(re.split(r'\s+', s), stdout=subprocess.PIPE)
        cmd_out = cmd.stdout.read()

        # Process output
        print cmd_out

    except OSError:
        print 'Invalid command'

谢谢,我试过了,但实际上我需要控制台的输出来调试脚本。但是这个脚本只执行命令。 - lucaba
好的建议。我相信即使有这些更改,代码仍然足够简单易懂。 - ryucl0ud
我不知道是我理解错了什么,还是没有解释清楚,但我已经成功做到了这样的事情,但我的问题是我不知道如何与blender通信。当我以这种方式启动blender时,它只会阻止脚本运行,但没有调试数据被打印出来。 - lucaba
@ryucl0ud,我刚刚发现你的脚本确实有效,但是信息只在退出blender时打印。你知道如何解决这个问题吗?谢谢。 - lucaba
@lucaba 我不太熟悉Blender如何处理Python脚本。也许它会采用一些奇怪的缓冲形式。在循环内打印后,您可以尝试执行import syssys.stdout.flush()来使其正常工作。 - ryucl0ud
显示剩余3条评论

5

这是我为了在Windows上实现你想要的功能而努力工作的东西。这是一个更加困难的问题,因为Windows不遵循任何标准,只遵循自己的标准。稍微修改一下这段代码应该就能给你想要的结果。

'''
Created on Mar 2, 2013

@author: rweber
'''
import subprocess
import Queue
from Queue import Empty
import threading


class Process_Communicator():

    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()

    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')
            pass

    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)

    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)

    def aggregate(self):
        while (self.running):
            self.update()
        self.update()

    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Empty:
            pass

        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Empty:
            pass

        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')

    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def get_stderr(self, clear=True):
        ret = self.unbblocked_err
        if clear:
            self.unbblocked_err = ""
        return ret

    def has_stderr(self):
        ret = self.get_stderr(False)
        if ret == '':
            return None
        else:
            return ret

    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()

        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()

        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass
def write_stdin(p,c):
    while p.poll() == None:
        i = raw_input("send to process:")
        if i is not None:
            c.stdin_queue.put(i)


p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE, stdin=subprocess.PIPE)
c = Process_Communicator(p)
stdin = threading.Thread(name="write_stdin",
                           target=write_stdin,
                           args=(p,c))
stdin.daemon = True  # thread dies with the program
stdin.start()
while p.poll() == None:
    if c.has_stdout():
        print c.get_stdout()
    if c.has_stderr():
        print c.get_stderr()

c.join()
print "Exit"

1

看起来你应该在新的控制终端上运行它,使用“forkpty”分配。 为了抑制警告“没有作业控制...”,你需要调用“setsid”。


谢谢您的回复,但我不太熟悉这两个命令,您能否解释一下它们以及为什么我应该使用它们。谢谢。 - lucaba
import pty; pid, master = pty.fork(); if not pid: os.execlp("/bin/sh", "/bin/sh", "-c", "cd $HOME && exec %s" % command) - kefir_
1
在pty.fork()中调用fork、openpty和setsid。要获取响应,使用os.read(master, length)。 - kefir_
谢谢。但我仍然不明白它的作用,请解释一下。无论如何,我已经点赞了,因为它对我有帮助。 - lucaba

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接