如何从`stdin`创建非阻塞的持续读取?

7

我有一个单一的进程,它是这样创建的:

p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

稍后,我尝试向“p”的标准输入(stdin)写入内容。
p.stdin.write('my message\n')

myapp 进程的设置如下:

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

它试图持续读取新行,就像这样:

try:
    print('input:', q.get_nowait())
except Empty:
    print('no input')

很遗憾,子进程从未收到我的任何消息。当然,当我使用以下代码时:
p.communicate('my message\n')

子进程接收到消息,但是正如预期的那样,communicate方法关闭了pstdin,因此没有更多的通信正在进行。


2
如果您不想结束进程,那么就不应该使用 communicate(它只发送数据,然后等待进程终止);而是直接写入 p.stdin - poke
stdin.flush()?使用像async_subprocess这样的模块如何? - Inbar Rose
@InbarRose 已经尝试过了,但没有成功。 - Peter Varo
你能用语言描述一下什么是“非阻塞连续从stdin读取”吗?我可以理解“连续”:for line in sys.stdin: got(line)(注意:你不需要iter(...),这里没有预读错误)-它逐行读取直到EOF,必要时等待每行。我理解“非阻塞”q.get_nowait()会立即返回一行或引发“Empty”异常。但是,“非阻塞连续”在一起的行为是什么?如果你将q.get_nowait()放在一个循环中,如果出现“Empty”异常,你想做什么(如果答案是什么都不做... - jfs
如果没有必要无休止地旋转和浪费CPU,可以使用阻塞代码。 - jfs
显示剩余4条评论
5个回答

8
p = subprocess.Popen(args   = './myapp',
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

while p.poll() is None:
    data = p.stdout.readline()

这将创建一个非阻塞读取您的进程,直到该进程退出。 但是,请注意这里有一些要注意的事项。例如,如果您也将stderr管道化,但没有从中读取..那么你很可能会填充一个或两个缓冲区,并且程序将挂起。因此,在手动执行操作时,始终确保清除任何缓冲区I/O。 更好的选择是使用select.epoll()(如果可能),这仅在unix系统上可用,但提供了更好的性能和错误处理 :)
epoll = select.epoll()
epoll.register(p.stdout.fileno(), select.EPOLLHUP) # Use select.EPOLLIN for stdin.

for fileno, event in epoll.poll(1):
    if fileno == p.stdout.fileno():
        # ... Do something ...

注意:请记住,每当一个进程需要输入时,它通常会通过stdout指示这一点,因此您仍然需要使用select.epoll来注册STDOUT以检查是否“等待输入”。您可以注册select.EPOLLIN来检查是否输入了内容,但我认为这没有意义,因为您应该已经知道输入的内容。

检查进程是否需要输入

您可以使用select.epoll来检查进程是否正在等待输入,而不会阻止您的应用程序执行上述示例。但是有更好的替代方案。

Pexpect是一个非常好的库,可以与SSH一起使用。

它的工作方式略有不同,但可能是一个很好的选择。

让subprocess.popen与SSH一起工作

如果您需要这样做(因为SSH将以受保护的方式生成一个stdin),我将重定向到另一个问题+答案。

Python + SSH Password auth (no external libraries or public/private keys)?


1
首先,感谢您的答复。其次,您只提到了“stdout”,那么这对“stdin”也有效吗?因为我的问题特别针对它。 - Peter Varo
1
你为什么认为 p.stdout.read() 是非阻塞的?它只有在读到 EOF 时才会返回。 - jfs
@J.F.Sebastian 正确,我使用了read()而不是readline(),这是我的错误。这就是为什么我不完全准备好从脑海中编写代码的原因 :) - Torxed
1
你为什么认为 p.stdout.readline() 是非阻塞的呢?它只有在遇到换行符或 EOF 时才会返回。此外,在 Python 3 中,p.poll() 是不必要的:for line in p.stdout: 可以正常工作。 - jfs
for line in p.stdout: 在 Python 2 上也可以工作,但存在预读错误,因此您必须使用 for line in iter(p.stdout.readline, b''): 替代。如果我想要非阻塞行为,我可能会使用可移植的 asyncio-based基于线程的 (line = q.get(timeout=1)) 方法。 - jfs
显示剩余2条评论

2

我想你可能没有看到正在进行的操作的输出。以下是一个完整的示例,它似乎在我的计算机上可以运行,除非我完全误解了您想要的内容。我所做的主要更改是将pstdout设置为sys.stdout,而不是subprocess.PIPE。也许我误解了您问题的重点,那一部分至关重要...

以下是完整代码和输出:

在发送过程(测试)中(我将其命名为test_comms.py)。我当前正在使用Windows,因此需要使用.bat文件:

import time
import subprocess
import sys

# Note I'm sending stdout to sys.stdout for observation purposes
p = subprocess.Popen(args = 'myapp.bat',
                     stdin  = subprocess.PIPE,
                     stdout = sys.stdout,
                     universal_newlines=True)

#Send 10 messages to the process's stdin, 1 second apart                    
for i in range(10):
    time.sleep(1)
    p.stdin.write('my message\n')

myapp.bat是一个非常简单的脚本:

echo "In the bat cave (script)"
python myapp.py

我的应用程序myapp.py包含以下内容(在当前环境Python 2中使用Queue而不是queue):

import Queue
from Queue import Empty
import threading
import sys
import time

def get_input():
    print("Started the listening thread")
    for line in iter(sys.stdin.readline, ''):
        print("line arrived to put on the queue\n")
        q.put(line)
    sys.stdin.close()

print("Hi, I'm here via popen")    
q = Queue.Queue()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()

print("stdin listener Thread created and started")

# Read off the queue - note it's being filled asynchronously based on 
# When it receives messages.  I set the read interval below to 2 seconds
# to illustrate the queue filling and emptying.
while True:
    time.sleep(2)
    try:
        print('Queue size is',q.qsize())
        print('input:', q.get_nowait())
    except Empty:
        print('no input')

print("Past my end of code...")

输出:

D:\>comms_test.py

D:\>echo "In the bat cave (script)"
"In the bat cave (script)"

D:\>python myapp.py
Hi, I'm here via popen
Started the listening threadstdin listener Thread created and started

line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 2)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 3)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 4)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue

('Queue size is', 5)
('input:', 'my message\n')
line arrived to put on the queue

line arrived to put on the queue


D:\>('Queue size is', 6)
('input:', 'my message\n')
('Queue size is', 5)
('input:', 'my message\n')
('Queue size is', 4)
('input:', 'my message\n')
('Queue size is', 3)
('input:', 'my message\n')
('Queue size is', 2)
('input:', 'my message\n')
('Queue size is', 1)
('input:', 'my message\n')
('Queue size is', 0)
no input
('Queue size is', 0)
no input
('Queue size is', 0)
no input

除非在Python脚本中重新分配sys.stdout,否则完全省略stdout参数应该具有相同的效果。 - jfs
Python 3 中的缓冲存在几个错误; 我会使用 print('my message', file=p.stdin, flush=True) 而不是 p.stdin.write('my message\n')。传递显式的 bufsize=1 - jfs
除非每2秒必须打印“无输入”,否则我会使用简单的for line in sys.stdin: print('input: ' + line, end='')而不是Thread,Queue等。 - jfs
感谢有用的评论 - 我正在移动中,但稍后会进行编辑。说实话,我对使用情况并不完全清楚,根据任何 OP 的评论可能需要进行重大修订。也会在有机会时测试 Python 3。 - J Richard Snape

2

为了使所有内容正常工作,您需要在主进程(p.stdout)和子进程(sys.stdout)中刷新输出。

communicate会执行两个刷新操作:

  • 当关闭p.stdin时,它会刷新它
  • 它等待sys.stdout的输出被刷新(就在退出之前)

以下是一个工作正常的main.py示例:

import subprocess,time
import sys
p = subprocess.Popen(args   = ['python3', './myapp.py'],
                     stdin  = subprocess.PIPE,
                     stdout = subprocess.PIPE,
                     universal_newlines=True)

time.sleep(0.5)
p.stdin.write('my message\n')
p.stdin.flush()
#print("ici")
for i,l in  enumerate(iter(p.stdout.readline, ''),start=1):

    print("main:received:",i,repr(l))
    if i == 6:
        break
    print("mainprocess:send:other message n°{}".format(i))
    p.stdin.write("other message n°{}\n".format(i))
    p.stdin.flush()

print("main:waiting for subprocess")
p.stdin.close()    
p.wait()

myapp.py示例 导入 queue、threading、sys、time 和 rpdb 模块

q = queue.Queue()
def get_input():
    for line in iter(sys.stdin.readline, ''):
        q.put(line)
    sys.stdin.close()

threading.Thread(name   = 'input-getter',
                 target = get_input).start()
for i in range(6):
    try:
        l= q.get_nowait()
        print('myapp:input:', l,end="")
        sys.stdout.flush()

    except queue.Empty:
        print("myapp:no input")
        sys.stdout.flush()    
        time.sleep(1)

结果:

main:received: 1 'myapp:no input\n'
mainprocess:send:other message n°1
main:received: 2 'myapp:input: my message\n'
mainprocess:send:other message n°2
main:received: 3 'myapp:input: other message n°1\n'
mainprocess:send:other message n°3
main:received: 4 'myapp:no input\n'
mainprocess:send:other message n°4
main:received: 5 'myapp:input: other message n°2\n'
mainprocess:send:other message n°5
main:received: 6 'myapp:input: other message n°3\n'
main:waiting for subprocess

1
我已经编写了一个程序,可以异步地处理涉及IO的所有操作。它在一个线程上读取输入,在另一个线程上输出,在一个进程中创建进程,并在一个线程上与该进程进行通信。
我不确定您的程序需要完成什么任务,但希望这段代码能够实现它。
# Asynchronous cat program!

# Asynchronously read stdin
# Pump the results into a threadsafe queue
# Asynchronously feed the contents to cat
# Then catch the output from cat and print it
# Thread all the things

import subprocess
import threading
import queue
import sys

my_queue = queue.Queue()

# Input!
def input_method():
    for line in sys.stdin: # End on EOF
        if line == 'STOP\n': # Also end on STOP
            break
        my_queue.put(line)
input_thread = threading.Thread(target=input_method)
input_thread.start()

print ('Input thread started')


# Subprocess!
cat_process = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)

print ('cat process started')

queue_alive = True
# Continuously dump the queue into cat
def queue_dump_method():
    while queue_alive:
        try:
            line = my_queue.get(timeout=2)
            cat_process.stdin.write(line.encode())
            cat_process.stdin.flush() # For some reason, we have to manually flush
            my_queue.task_done() # Needed?
        except queue.Empty:
            pass
queue_dump_thread = threading.Thread(target = queue_dump_method)
queue_dump_thread.start()

print ('Queue dump thread started')

# Output!
def output_method():
    for line in cat_process.stdout:
        print(line)
output_thread = threading.Thread(target=output_method)
output_thread.start()

print ('Output thread started')


# input_thread will die when we type STOP
input_thread.join()
print ('Input thread joined')

# Now we wait for the queue to finish processing
my_queue.join()
print ('Queue empty')

queue_alive = False
queue_dump_thread.join()
print ("Queue dump thread joined")

# Send EOF to cat
cat_process.stdin.close()

# This kills the cat
cat_process.wait()
print ('cat process done')

# And make sure we're done outputting
output_thread.join()
print ('Output thread joined')

PS. 这个程序显然很愚蠢,我怀疑你不需要异步地进行所有这些IO操作。 - QuestionC

1
我尝试调查您的程序,编写了自己的“不断流数据到cat并捕获返回值”的程序。我没有实现子进程部分,但希望结构类似。
这行对于您的程序非常奇怪...
for line in iter(sys.stdin.readline, ''):
    q.put(line)
sys.stdin.close()

That looks an awful lot like

for line in stdin:
    q.put(line)

请注意,当管道关闭时循环将结束,之后无需重新关闭它。
如果您需要持续异步读取标准输入流,则应该能够构建一个与下面代码中的child_reader几乎相同的读取线程。只需将child.stdout替换为stdin即可。
import subprocess
import threading
import random

# We may need to guard this?
child = subprocess.Popen('cat', stdout=subprocess.PIPE, stdin=subprocess.PIPE)

# Continuously print what the process outputs...
def print_child():
    for line in child.stdout:
        print(line)

child_reader = threading.Thread(target = print_child)
child_reader.start()

for i in range(10000):
    chars = 'ABC\n'
    child.stdin.write(random.choice(chars).encode())

# Send EOF.
# This kills the cat.
child.stdin.close()

# I don't think order matters here?
child.wait()
child_reader.join()

  1. 你说得对,在Python 3中,for line in stdin就可以工作,不需要使用iter(..)
  2. “之后没有必要重新关闭它”是错误的。你需要这样做来避免依赖垃圾回收来处理相应的文件描述符(注意:不要混淆父进程和子进程中的管道--它们是连接的,但每个进程都有自己的一组)。
- jfs
好的,我可以理解清理文件描述符,但是 sys.stdin.close() 呢? - QuestionC
我指的是subprocess' pipes,例如你代码中的child.stdout。我同意,在大多数情况下关闭sys.stdin没有意义。 - jfs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接