Python subprocess实时输入和多个控制台

22

主要问题

简而言之:我想要两个控制台来运行我的程序。一个用于用户输入,另一个用于纯日志输出。 (包括被接受答案的工作代码在下面的“编辑-3”章节中。而在“编辑-1”和“编辑-2”章节中有可行的解决方案。)

为此,我有一个主命令行Python脚本,它应该打开一个额外的控制台,仅用于日志输出。为此,我打算重定向日志输出,它将会被打印到主脚本的控制台上,然后通过子进程启动的第二个控制台的标准输入流传递给它。(我使用子进程,因为我没有找到其他打开第二个控制台的方法。)

问题是,似乎我能够将内容发送到这个第二个控制台的标准输入流,但是在这个第二个控制台上却没有任何内容被打印出来。

下面是我用于实验的代码(在Windows 10下使用PyDev的Python 3.4)。函数writing(input, pipe, process) 包含了将生成的字符串复制到通过子进程打开的控制台的pipe标准输入流的部分。函数writing(...) 通过类writetest(Thread) 运行。(我留下了一些我注释掉的代码。)

import os
import sys
import io
import time
import threading
from cmd import Cmd
from queue import Queue
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE


REPETITIONS = 3


# Position of "The class" (Edit-2)


# Position of "The class" (Edit-1)


class generatetest(threading.Thread):

    def __init__(self, queue):
        self.output = queue
        threading.Thread.__init__(self)

    def run(self):
        print('run generatetest')
        generating(REPETITIONS, self.output)
        print('generatetest done')

    def getout(self):
        return self.output


class writetest(threading.Thread):

    def __init__(self, input=None, pipe=None, process=None):
        if (input == None):        # just in case
            self.input = Queue()
        else:
            self.input = input

        if (pipe == None):        # just in case
            self.pipe = PIPE
        else:
            self.pipe = pipe

        if (process == None):        # just in case
            self.process = subprocess.Popen('C:\Windows\System32\cmd.exe', universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
        else:
            self.process = proc

        threading.Thread.__init__(self)

    def run(self):
        print('run writetest')
        writing(self.input, self.pipe, self.process)
        print('writetest done')


# Position of "The function" (Edit-2)


# Position of "The function" (Edit-1)


def generating(maxint, outline):
    print('def generating')
    for i in range(maxint):
        time.sleep(1)
        outline.put_nowait(i)


def writing(input, pipe, process):
    print('def writing')
    while(True):
        try:
            print('try')
            string = str(input.get(True, REPETITIONS)) + "\n"
            pipe = io.StringIO(string)
            pipe.flush()
            time.sleep(1)
            # print(pipe.readline())
        except:
            print('except')
            break
        finally:
            print('finally')
            pass


data_queue = Queue()
data_pipe = sys.stdin
# printer = sys.stdout
# data_pipe = os.pipe()[1]


# The code of 'C:\\Users\\Public\\Documents\\test\\test-cmd.py'
# can be found in the question's text further below under "More code"


exe = 'C:\Python34\python.exe'
# exe = 'C:\Windows\System32\cmd.exe'
arg = 'C:\\Users\\Public\\Documents\\test\\test-cmd.py'
arguments = [exe, arg]
# proc = Popen(arguments, universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
proc = Popen(arguments, stdin=data_pipe, stdout=PIPE, stderr=PIPE,
             universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)


# Position of "The call" (Edit-2 & Edit-1) - file init (proxyfile)


# Position of "The call" (Edit-2) - thread = sockettest()
# Position of "The call" (Edit-1) - thread0 = logtest()
thread1 = generatetest(data_queue)
thread2 = writetest(data_queue, data_pipe, proc)
# time.sleep(5)


# Position of "The call" (Edit-2) - thread.start()
# Position of "The call" (Edit-1) - thread0.start()
thread1.start()
thread2.start()


# Position of "The call" (Edit-2) - thread.join()
# Position of "The call" (Edit-1) - thread.join()
thread1.join(REPETITIONS * REPETITIONS)
thread2.join(REPETITIONS * REPETITIONS)

# data_queue.join()
# receiver = proc.communicate(stdin, 5)
# print('OUT:' + receiver[0])
# print('ERR:' + receiver[1])

print("1st part finished")

稍微不同的方法

以下额外的代码片段可以提取子进程的标准输出。但是,先前发送的标准输入仍未在第二个控制台上打印。此外,第二个控制台会立即关闭。


proc2 = Popen(['C:\Python34\python.exe', '-i'],
              stdin=PIPE,
              stdout=PIPE,
              stderr=PIPE,
              creationflags=CREATE_NEW_CONSOLE)
proc2.stdin.write(b'2+2\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
proc2.stdin.write(b'len("foobar")\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
time.sleep(1)
proc2.stdin.close()
proc2.terminate()
proc2.wait(timeout=0.2)

print("Exiting Main Thread")

更多信息

一旦我使用参数stdin=data_pipe, stdout=PIPE, stderr=PIPE启动子进程中的一个,结果第二个控制台不活动并且不能接受键盘输入(虽然这不是期望的,但这可能在此处提供有用的信息)。

该子进程方法communicate()无法用于此操作,因为它等待进程结束。


更多代码

最后,文件的代码,用于第二个控制台。

C:\Users\Public\Documents\test\test-cmd.py

from cmd import Cmd
from time import sleep
from datetime import datetime

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, intro=INTRO, prompt=PROMPT):
        Cmd.__init__(self)
        self.intro = intro
        self.prompt = prompt
        self.doc_header = intro
        self.running = False

    def do_dummy(self, args):
        """Runs a dummy method."""
        print("Do the dummy.")
        self.running = True
        while(self.running == True):
            print(datetime.now())
            sleep(5)

    def do_stop(self, args):
        """Stops the dummy method."""
        print("Stop the dummy, if you can.")
        self.running = False

    def do_exit(self, args):
        """Exits this console."""
        print("Do console exit.")
        exit()

if __name__ == '__main__':
    cl = CommandLine()
    cl.prompt = PROMPT
    cl.cmdloop(INTRO)

想法

到目前为止,我甚至不确定Windows命令行界面是否提供接受除键盘输入以外的其他输入的能力(而不是所需的stdin管道或类似的)。尽管它有某种被动模式,但我还是期望它能够实现。

为什么这不起作用?


编辑-1:通过文件绕过(概念验证)

在Python中使用多个控制台答案中建议的那样,使用文件作为解决方法以显示其新内容,总体上是可行的。然而,由于日志文件将增长到数GB,因此在本例中它不是一个实用的解决方案。这至少需要文件分割和适当的处理。

该类:


class logtest(threading.Thread):

    def __init__(self, file):
        self.file = file
        threading.Thread.__init__(self)

    def run(self):
        print('run logtest')
        logging(self.file)
        print('logtest done')

这个函数:

def logging(file):
    pexe = 'C:\Python34\python.exe '
    script = 'C:\\Users\\Public\\Documents\\test\\test-004.py'
    filek = '--file'
    filev = file

    file = open(file, 'a')
    file.close()
    time.sleep(1)

    print('LOG START (outer): ' + script + ' ' + filek + ' ' + filev)
    proc = Popen([pexe, script, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
    print('LOG FINISH (outer): ' + script + ' ' + filek + ' ' + filev)

    time.sleep(2)

这个调用:

# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\\Users\\Public\\Documents\\test\\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread0 = logtest(proxyfile)
thread0.start()
thread0.join(REPETITIONS * REPETITIONS)

尾部脚本(“test-004.py”):

由于Windows系统没有tail命令,所以我使用了以下脚本代替(基于回答如何实现Python的tail -F等效命令?),这对我来说很有效。额外的、有点不必要的class CommandLine(Cmd)最初是为了保持第二个控制台打开(因为缺少脚本文件参数)。然而,它也被证明对于使控制台流畅地打印新的日志文件内容非常有用。否则输出就不是确定性的或可预测的。

import time
import sys
import os
import threading
from cmd import Cmd
from argparse import ArgumentParser


def main(args):
    parser = ArgumentParser(description="Parse arguments.")
    parser.add_argument("-f", "--file", type=str, default='', required=False)
    arguments = parser.parse_args(args)

    if not arguments.file:
        print('LOG PRE-START (inner): file argument not found. Creating new default entry.')
        arguments.file = 'C:\\Users\\Public\\Documents\\test\\tempdata'

    print('LOG START (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)

    f = open(arguments.file, 'a')
    f.close()
    time.sleep(1)

    words = ['word']
    console = CommandLine(arguments.file, words)
    console.prompt = ''

    thread = threading.Thread(target=console.cmdloop, args=('', ))
    thread.start()
    print("\n")

    for hit_word, hit_sentence in console.watch():
        print("Found %r in line: %r" % (hit_word, hit_sentence))

    print('LOG FINISH (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, fn, words):
        Cmd.__init__(self)
        self.fn = fn
        self.words = words

    def watch(self):
        fp = open(self.fn, 'r')
        while True:
            time.sleep(0.05)
            new = fp.readline()
            print(new)
            # Once all lines are read this just returns ''
            # until the file changes and a new line appears

            if new:
                for word in self.words:
                    if word in new:
                        yield (word, new)

            else:
                time.sleep(0.5)


if __name__ == '__main__':
    print('LOG START (inner - as main).')
    main(sys.argv[1:])

编辑-1: 更多想法

三个我尚未尝试过但可能有效的解决方法是sockets(也建议在这个答案中使用在Python中工作多个控制台),通过进程ID获取进程对象以获得更多控制权,并使用ctypes库直接访问Windows控制台API,允许设置屏幕缓冲区,因为控制台可以有多个缓冲区,但只有一个活动缓冲区(在CreateConsoleScreenBuffer函数的文档注释中说明)。

然而,使用sockets可能是最简单的方法。并且这种方式不会受到日志大小的影响。尽管,在此处可能会遇到连接问题。


编辑-2: 通过sockets的解决方法(概念证明)

在Python中工作多个控制台的回答中建议的那样,使用sockets作为解决方法以显示新的日志条目,总的来说也是有效的。不过,这似乎对于应该简单地发送到接收控制台进程的内容来说,需要付出太多的努力。

类:

class sockettest(threading.Thread):

    def __init__(self, host, port, file):
        self.host = host
        self.port = port
        self.file = file
        threading.Thread.__init__(self)

    def run(self):
        print('run sockettest')
        socketing(self.host, self.port, self.file)
        print('sockettest done')

这个函数:

def socketing(host, port, file):
    pexe = 'C:\Python34\python.exe '
    script = 'C:\\Users\\Public\\Documents\\test\test-005.py'
    hostk = '--address'
    hostv = str(host)
    portk = '--port'
    portv = str(port)
    filek = '--file'
    filev = file

    file = open(file, 'a')
    file.close()
    time.sleep(1)

    print('HOST START (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)
    proc = Popen([pexe, script, hostk, hostv, portk, portv, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)

    print('HOST FINISH (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)

    time.sleep(2)

调用:

# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\\Users\\Public\\Documents\\test\\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread = sockettest('127.0.0.1', 8888, proxyfile)
thread.start()
thread.join(REPETITIONS * REPETITIONS)

套接字脚本(“test-005.py”):

以下脚本基于Python:使用线程进行套接字编程的服务器客户端应用程序。这里我只保留了class CommandLine(Cmd)作为日志条目生成器。此时,将客户端放入主脚本中并调用第二个控制台,而不是(新的)文件行,然后向队列提供真实的日志条目不应该成为问题。(服务器是打印机。)

import socket
import sys
import threading
import time
from cmd import Cmd
from argparse import ArgumentParser
from queue import Queue

BUFFER_SIZE = 5120

class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, fn, words, queue):
        Cmd.__init__(self)
        self.fn = fn
        self.words = words
        self.queue = queue

    def watch(self):
        fp = open(self.fn, 'r')
        while True:
            time.sleep(0.05)
            new = fp.readline()

            # Once all lines are read this just returns ''
            # until the file changes and a new line appears
            self.queue.put_nowait(new)


def main(args):
    parser = ArgumentParser(description="Parse arguments.")
    parser.add_argument("-a", "--address", type=str, default='127.0.0.1', required=False)
    parser.add_argument("-p", "--port", type=str, default='8888', required=False)
    parser.add_argument("-f", "--file", type=str, default='', required=False)
    arguments = parser.parse_args(args)

    if not arguments.address:
        print('HOST PRE-START (inner): host argument not found. Creating new default entry.')
        arguments.host = '127.0.0.1'
    if not arguments.port:
        print('HOST PRE-START (inner): port argument not found. Creating new default entry.')
        arguments.port = '8888'
    if not arguments.file:
        print('HOST PRE-START (inner): file argument not found. Creating new default entry.')
        arguments.file = 'C:\\Users\\Public\\Documents\\test\\tempdata'

    file_queue = Queue()

    print('HOST START (inner): ' + ' ' + arguments.address + ':' + arguments.port + ' --file ' + arguments.file)

    # Start server
    thread = threading.Thread(target=start_server, args=(arguments.address, arguments.port, ))
    thread.start()
    time.sleep(1)

    # Start client
    thread = threading.Thread(target=start_client, args=(arguments.address, arguments.port, file_queue, ))
    thread.start()

    # Start file reader
    f = open(arguments.file, 'a')
    f.close()
    time.sleep(1)

    words = ['word']
    console = CommandLine(arguments.file, words, file_queue)
    console.prompt = ''

    thread = threading.Thread(target=console.cmdloop, args=('', ))
    thread.start()
    print("\n")

    for hit_word, hit_sentence in console.watch():
        print("Found %r in line: %r" % (hit_word, hit_sentence))

    print('HOST FINISH (inner): ' + ' ' + arguments.address + ':' + arguments.port)


def start_client(host, port, queue):
    host = host
    port = int(port)         # arbitrary non-privileged port
    queue = queue

    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        soc.connect((host, port))
    except:
        print("Client connection error" + str(sys.exc_info()))
        sys.exit()

    print("Enter 'quit' to exit")
    message = ""

    while message != 'quit':
        time.sleep(0.05)
        if(message != ""):
            soc.sendall(message.encode("utf8"))
            if soc.recv(BUFFER_SIZE).decode("utf8") == "-":
                pass        # null operation

        string = ""
        if (not queue.empty()):
            string = str(queue.get_nowait()) + "\n"

        if(string == None or string == ""):
            message = ""
        else:
            message = string

    soc.send(b'--quit--')


def start_server(host, port):
    host = host
    port = int(port)         # arbitrary non-privileged port

    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # SO_REUSEADDR flag tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire
    soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    print("Socket created")

    try:
        soc.bind((host, port))
    except:
        print("Bind failed. Error : " + str(sys.exc_info()))
        sys.exit()

    soc.listen(5)       # queue up to 5 requests
    print("Socket now listening")

    # infinite loop- do not reset for every requests
    while True:
        connection, address = soc.accept()
        ip, port = str(address[0]), str(address[1])
        print("Connected with " + ip + ":" + port)

        try:
            threading.Thread(target=client_thread, args=(connection, ip, port)).start()
        except:
            print("Thread did not start.")
            traceback.print_exc()

    soc.close()


def client_thread(connection, ip, port, max_buffer_size=BUFFER_SIZE):
    is_active = True

    while is_active:
        client_input = receive_input(connection, max_buffer_size)

        if "--QUIT--" in client_input:
            print("Client is requesting to quit")
            connection.close()
            print("Connection " + ip + ":" + port + " closed")
            is_active = False
        elif not client_input == "":
            print("{}".format(client_input))
            connection.sendall("-".encode("utf8"))
        else:
            connection.sendall("-".encode("utf8"))


def receive_input(connection, max_buffer_size):
    client_input = connection.recv(max_buffer_size)
    client_input_size = sys.getsizeof(client_input)

    if client_input_size > max_buffer_size:
        print("The input size is greater than expected {}".format(client_input_size))

    decoded_input = client_input.decode("utf8").rstrip()  # decode and strip end of line
    result = process_input(decoded_input)

    return result


def process_input(input_str):
    return str(input_str).upper()


if __name__ == '__main__':
    print('HOST START (inner - as main).')
    main(sys.argv[1:])

另外的想法

直接控制子进程的控制台输入管道/缓冲区将是解决这个问题的首选方案。因此奖励500声望值。

不幸的是,我时间有限。因此我可能会现在使用其中一个解决方法,并稍后用正确的解决方案替换它们。或者我可能必须使用核选项,只有一个控制台,在任何用户键盘输入期间暂停正在进行的日志输出,并在之后打印。当然,如果用户决定只输入一半内容,这可能导致缓冲区问题。


包括已被接受的答案的代码(一个文件)

使用James Kent的答案,当我通过Windows命令行(cmd)或PowerShell启动脚本时,可以得到所需的行为。但是,当我通过Eclipse/PyDev使用“Python运行”启动相同的脚本时,输出始终打印在主Eclipse/PyDev控制台上,而子进程的第二个控制台保持空白并保持不活动状态。虽然我猜这是另一个系统/环境特殊性以及不同的问题。

from sys import argv, stdin, stdout
from threading import Thread
from cmd import Cmd
from time import sleep
from datetime import datetime
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
    """Custom console"""

    def __init__(self, subprocess, intro=INTRO, prompt=PROMPT):
        Cmd.__init__(self)
        self.subprocess = subprocess
        self.intro = intro
        self.prompt = prompt
        self.doc_header = intro
        self.running = False

    def do_date(self, args):
        """Prints the current date and time."""
        print(datetime.now())
        sleep(1)

    def do_exit(self, args):
        """Exits this command line application."""
        print("Exit by user command.")
        if self.subprocess is not None:
            try:
                self.subprocess.terminate()
            except:
                self.subprocess.kill()
        exit()


class Console():

    def __init__(self):
        if '-r' not in argv:
            self.p = Popen(
                ['python.exe', __file__, '-r'],
                stdin=PIPE,
                creationflags=CREATE_NEW_CONSOLE
            )
        else:
            while True:
                data = stdin.read(1)
                if not data:
                    #                     break
                    sleep(1)
                    continue
                stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

    def getSubprocess(self):
        if self.p:
            return self.p
        else:
            return None


class Feeder (Thread):

    def __init__(self, console):
        self.console = console
        Thread.__init__(self)

    def run(self):
        feeding(self.console)


def feeding(console):
    for i in range(0, 100):
        console.write('test %i\n' % i)
        sleep(1)


if __name__ == '__main__':
    p = Console()
    if '-r' not in argv:
        thread = Feeder(p)
        thread.setDaemon(True)
        thread.start()

        cl = CommandLine(subprocess=p.getSubprocess())
        cl.use_rawinput = False
        cl.prompt = PROMPT
        cl.cmdloop('\nCommand line is waiting for user input (e.g. help).')

编辑3:荣誉提及

在上面的问题文本中,我已经提到使用ctypes库直接访问Windows控制台API作为另一种解决方法(在“编辑1:更多想法”下)。或者以某种方式只使用一个控制台,使输入提示始终保持在底部,作为整个问题的核心选择。 (在“编辑2:进一步思考”下)

对于使用ctypes库,我会参考以下答案:在Windows中更改控制台字体。而对于仅使用一个控制台,我会尝试以下答案:保持控制台输入行在输出下方。我认为这两个答案都可能对这个问题有潜在的好处,可能对其他遇到这篇文章的人有所帮助。此外,如果我有时间,我将尝试看看它们是否能以某种方式工作。


编辑1终于成功了(正如在https://dev59.com/tnPYa4cB1Zd3GeqPpPMp?answertab=active#answer-17405766中所建议的)。不过,这更多是一个概念验证,因为潜在的日志文件将会增长到很多GB。 - Jonathan Root
Edit-2 也可以工作(套接字解决方法,也在这里建议 https://dev59.com/tnPYa4cB1Zd3GeqPpPMp?answertab=active#answer-17405766)。但是,这又是一个不必要的复杂绕路或黑客行为,而不是对子进程输入或输出进行干净访问。毕竟,它在一台机器上运行,所有东西都在 RAM 中。 - Jonathan Root
请将以下与编程有关的内容从英语翻译成中文。只返回翻译文本:为了澄清,您的问题开头明确表示您希望将其用作显示输出,但在中途您谈到它不接受键盘输入,那么您实际需要从这第二个控制台中得到什么? - James Kent
第二个控制台仅用于日志输出。然而,在我的测试中,我注意到它不接受stdin、stdout或stderr设置为PIPE的输入。如果相关,我将此添加为额外信息。 - Jonathan Root
编辑-3:这段代码包括被接受的答案,当我通过Windows命令行(cmd)或PowerShell启动脚本时,它按预期工作。只有当我通过Eclipse/PyDev使用“Python运行”启动相同的脚本时,输出会打印到主Eclipse/PyDev控制台上,而子进程的第二个控制台保持空白且不活动。虽然我猜这是另一个系统/环境特殊性和不同的问题。 - Jonathan Root
2个回答

14
您面临的问题是Windows控制台子系统的架构,在通常看到的控制台窗口不是由cmd.exe托管,而是由conhost.exe托管。一个conhost窗口的子进程只能连接到单个conhost实例,这意味着您每个进程只能限制为一个窗口。
因此,如果您想要拥有额外的控制台窗口,则需要为每个窗口创建一个额外的进程,然后要查看如何处理stdin和stdout以显示任何内容,通常情况下它们都是由conhost实例写入并读取。但是,如果将stdin转换为管道(以便可以向进程写入数据),则不再从conhost获得输入,而是从父进程获得输入,因此conhost无法获取它。这意味着写入stdin的任何内容仅由子进程读取,因此不会被conhost显示。
据我所知,没有像那样共享管道的方法。
作为副作用,如果将stdin转换为管道,则发送到新控制台窗口的所有键盘输入都会无效,因为stdin未连接到该窗口。
对于仅输出功能,这意味着您可以生成一个新进程,该进程通过与父进程进行管道通信,并将所有内容回显到stdout。
以下是一种尝试:
#!python3

import sys, subprocess, time

class Console():
    def __init__(self):
        if '-r' not in sys.argv:
            self.p = subprocess.Popen(
                ['python.exe', __file__, '-r'],
                stdin=subprocess.PIPE,
                creationflags=subprocess.CREATE_NEW_CONSOLE
                )
        else:
            while True:
                data = sys.stdin.read(1)
                if not data:
                    break
                sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()
    if '-r' not in sys.argv:
        for i in range(0, 100):
            p.write('test %i\n' % i)
            time.sleep(1)

所以,简单的管道可以在两个进程之间进行,并且如果它是子进程,则将输入回显到输出。我使用了“-r”来表示实例是否为进程,但根据您的实现方式,还有其他方法。

需要注意以下几点:

  • 写入stdin后刷新是必要的,因为Python通常使用缓冲区。
  • 这种方法的编写方式旨在成为自己的模块,因此使用了__file__
  • 由于使用了__file__,如果使用cx_Freeze或类似工具进行冻结,则可能需要修改此方法。

编辑1

可与cx_Freeze一起冻结的版本:

Console.py

import sys, subprocess

class Console():
    def __init__(self, ischild=True):
        if not ischild:
            if hasattr(sys, 'frozen'):
                args = ['Console.exe']
            else:
                args = [sys.executable, __file__]
            self.p = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                creationflags=subprocess.CREATE_NEW_CONSOLE
                )
        else:
            while True:
                data = sys.stdin.read(1)
                if not data:
                    break
                sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()

test.py

from Console import Console
import sys, time

if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
        p.write('test %i\n' % i)
        time.sleep(1)

setup.py

from cx_Freeze import setup, Executable

setup(
    name = 'Console-test',
    executables = [
        Executable(
            'Console.py',
            base=None,
            ),
        Executable(
            'test.py',
            base=None,
            )
        ]
)

编辑2

新版本,应该可以在像IDLE这样的开发工具下运行

Console.py

#!python3

import ctypes, sys, subprocess

Kernel32 = ctypes.windll.Kernel32

class Console():
    def __init__(self, ischild=True):
        if ischild:
            # try allocate new console
            result = Kernel32.AllocConsole()
            if result > 0:
                # if we succeed open handle to the console output
                sys.stdout = open('CONOUT$', mode='w')
        else:
            # if frozen we assume its names Console.exe
            # note that when frozen 'Win32GUI' must be used as a base
            if hasattr(sys, 'frozen'):
                args = ['Console.exe']
            else:
                # otherwise we use the console free version of python
                args = ['pythonw.exe', __file__]
            self.p = subprocess.Popen(
                args,
                stdin=subprocess.PIPE
                )
            return
        while True:
            data = sys.stdin.read(1)
            if not data:
                break
            sys.stdout.write(data)

    def write(self, data):
        self.p.stdin.write(data.encode('utf8'))
        self.p.stdin.flush()

if (__name__ == '__main__'):
    p = Console()

test.py

from Console import Console
import sys, time

if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
        p.write('test %i\n' % i)
        time.sleep(1)

安装.py

from cx_Freeze import setup, Executable

setup(
    name = 'Console-test',
    executables = [
        Executable(
            'Console.py',
            base='Win32GUI',
            ),
        Executable(
            'test.py',
            base=None,
            )
        ]
)

这可以做得更加鲁棒一些,即始终检查是否存在控制台,并在创建新控制台之前分离它(如果找到),可能有更好的错误处理。


我在你原始代码的每个函数、循环和if块的开头放置了print(sys.executable)。它总是在每个常规输出后打印出"C:\Python34\python.exe"。无论我是通过Windows命令行执行还是通过Eclipse运行它。 - Jonathan Root
2
@JonathanRoot 我已经编辑了我的答案,并提供了第二个版本的代码,希望能在你的开发环境下正常工作。基本上我认为当进程被创建时,开发工具(包括IDLE)会劫持stdin和out,通过在进程启动后创建控制台,我们可以设置stdin和out以适应我们的需求。 - James Kent
1
如果你有兴趣,我已经想出如何将其转换为完整的额外控制台,即输出和输入都可以,我已经编写了代码,使其始终是输出,而输入是可选的,请参见此处:https://github.com/JamesGKent/python-snippets/tree/master/Console/Console2 - James Kent
1
好消息!当我在PyDev / Eclipse上运行测试脚本时,您的第二次编辑也起作用了。绝对没有什么可抱怨的了 - 即使我想要。 ─ 您的全双工双控制台解决方案远远超出了要求,但它看起来非常有趣,我将在周末仔细查看代码。 ─ 我想再次感谢您,如果可以的话,我会让赏金自动分配给您的答案,我以我的词作为约束承诺。不过,如果您想要在周末开始享受额外的声誉,这也不是问题。 - Jonathan Root
@JonathanRoot 很荣幸,说实话我喜欢这个挑战,而且总是喜欢找到颠覆事物“应该”工作方式的方法。 - James Kent
显示剩余5条评论

0

由于您正在使用Windows操作系统,因此可以使用win32console模块打开第二个控制台或多个控制台以输出线程或子进程的内容。这是最简单和最容易的方法,适用于Windows操作系统。

以下是示例代码:

import win32console
import multiprocessing

def subprocess(queue):
    win32console.FreeConsole() #Frees subprocess from using main console
    win32console.AllocConsole() #Creates new console and all input and output of subprocess goes to this new console
    while True:
        print(queue.get())
        #prints any output produced by main script passed to subprocess using queue

if __name__ == "__main__": 
    queue = multiprocessing.Queue()
    multiprocessing.Process(target=subprocess, args=[queue]).start()
    while True:
        print("Hello World in main console")
        queue.put("Hello work in sub process console")
        #sends above string to subprocess and it prints it into its console

        #and whatever else you want to do in ur main process

您还可以使用线程来完成此操作。如果您想要队列功能,则必须使用队列模块,因为线程模块没有队列功能。

这里是win32console模块文档


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接