从sys.stdin获取输入,非阻塞

28

我正在为一个竞赛制作机器人,它通过 sys.stdin 接收输入,并使用 Python 的 print() 进行输出。我的代码如下:

import sys

def main():
    while True:
        line = sys.stdin.readline()
        parts = line.split()
        if len(parts) > 0:
            # do stuff

问题在于输入是通过流的方式传入的,使用上述方法会阻止我在流关闭之前打印任何东西。我该怎么做才能使这个程序正常工作?


可能是重复的问题:如何关闭缓冲?(原文链接:https://dev59.com/S2oy5IYBdhLWcg3wo_kn) - Ronny Lindner
1
非阻塞标准输入要么不起作用,要么不太可靠。您可以使用线程/多进程吗?因为那应该可以解决问题。 - Wayne Werner
8个回答

16

关闭阻塞后,您只能每次读取一个字符。因此,在非阻塞上下文中无法让readline()正常工作。我想你只是想读取按键以控制机器人。

在Linux上,我尝试使用select.select()没有成功,但通过调整termios设置创建了一种方法。因此,这仅适用于Linux,但对我有效:

import atexit, termios
import sys, os
import time


old_settings=None

def init_any_key():
   global old_settings
   old_settings = termios.tcgetattr(sys.stdin)
   new_settings = termios.tcgetattr(sys.stdin)
   new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags
   new_settings[6][termios.VMIN] = 0  # cc
   new_settings[6][termios.VTIME] = 0 # cc
   termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)


@atexit.register
def term_any_key():
   global old_settings
   if old_settings:
      termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)


def any_key():
   ch_set = []
   ch = os.read(sys.stdin.fileno(), 1)
   while ch is not None and len(ch) > 0:
      ch_set.append( ord(ch[0]) )
      ch = os.read(sys.stdin.fileno(), 1)
   return ch_set


init_any_key()
while True:
   key = any_key()
   if key is not None:
      print(key)
   else:
      time.sleep(0.1)

这里有一个更好的 Windows 或跨平台的解决方案:非阻塞控制台输入?


请注意,这也使终端“非回显”:按键不会显示。以下是另一种实现相同效果的优雅方式:http://ballingt.com/nonblocking-stdin-in-python-3/ - Patrick
1
当使用VS Code在我的Raspberry Pi上进行远程调试时,读取的字符是整数而不是字符串。添加以下内容可以使代码在远程上正常工作: if type(ch[0]) is int: ch_set.append(ch[0]) else: ch_set.append( ord(ch[0])) - Butterkekskrumel
如果您想要正确解释Unicode字符,请使用sys.stdin.read(1)而不是os.read(sys.stdin.fileno(), 1)。 - Stypox

10
您可以使用选择器来处理I/O多路复用: https://docs.python.org/zh-cn/3/library/selectors.html 请尝试这个:
#! /usr/bin/python3

import sys
import fcntl
import os
import selectors

# set sys.stdin non-blocking
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

# function to be called when enter is pressed
def got_keyboard_data(stdin):
    print('Keyboard input: {}'.format(stdin.read()))

# register event
m_selector = selectors.DefaultSelector()
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

while True:
    sys.stdout.write('Type something and hit enter: ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj)

以上代码将会在该行停留。
for k, mask in m_selector.select():

直到发生已注册的事件,才返回一个选择器键实例(k)和所监控事件的掩码。
在上面的示例中,我们仅注册了一个事件(Enter键按下)。
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

选择器键实例的定义如下:
abstractmethod register(fileobj, events, data=None)

因此,register方法将k.data设置为我们的回调函数got_keyboard_data,并在按下Enter键时调用它:
callback = k.data
callback(k.fileobj)

一个更完整(并且希望更有用)的例子是将用户的 stdin 数据与来自网络的传入连接进行多路复用:

import selectors
import socket
import sys
import os
import fcntl

m_selector = selectors.DefaultSelector()

# set sys.stdin non-blocking
def set_input_nonblocking():
    orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
    fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

def create_socket(port, max_conn):
    server_addr = ('localhost', port)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.setblocking(False)
    server.bind(server_addr)
    server.listen(max_conn)
    return server

def read(conn, mask):
    global GO_ON
    client_address = conn.getpeername()
    data = conn.recv(1024)
    print('Got {} from {}'.format(data, client_address))
    if not data:
         GO_ON = False

def accept(sock, mask):
    new_conn, addr = sock.accept()
    new_conn.setblocking(False)
    print('Accepting connection from {}'.format(addr))
    m_selector.register(new_conn, selectors.EVENT_READ, read)

def quit():
    global GO_ON
    print('Exiting...')
    GO_ON = False


def from_keyboard(arg1, arg2):
    line = arg1.read()
    if line == 'quit\n':
        quit()
    else:
        print('User input: {}'.format(line))

GO_ON = True
set_input_nonblocking()

# listen to port 10000, at most 10 connections
server = create_socket(10000, 10)

m_selector.register(server, selectors.EVENT_READ, accept)
m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard)

while GO_ON:
    sys.stdout.write('>>> ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj, mask)


# unregister events
m_selector.unregister(sys.stdin)

# close connection
server.shutdown()
server.close()

#  close select
m_selector.close()

你可以使用两个终端进行测试。 第一个终端:
$ python3 test.py 
>>> bla

打开另一个终端并运行:

 $ nc localhost 10000
 hey!

回到第一个

>>> qwerqwer     

结果(显示在主终端上):

$ python3 test.py 
>>> bla
User input: bla

>>> Accepting connection from ('127.0.0.1', 39598)
>>> Got b'hey!\n' from ('127.0.0.1', 39598)
>>> qwerqwer     
User input: qwerqwer

>>> 

请在您的帖子中添加解释,以便未来的访问者清楚易懂。 - Xantium
请注意,自Python 3.5以来,您可以简单地执行os.set_blocking(sys.stdin.fileno(), False)使stdin非阻塞。 - smac89

7
#-----------------------------------------------------------------------
# Get a character from the keyboard.  If Block is True wait for input,
# else return any available character or throw an exception if none is
# available.  Ctrl+C isn't handled and continues to generate the usual
# SIGINT signal, but special keys like the arrows return the expected 
# escape sequences.
#
# This requires:
#
#    import sys, select
#
# This was tested using python 2.7 on Mac OS X.  It will work on any
# Linux system, but will likely fail on Windows due to select/stdin
# limitations.
#-----------------------------------------------------------------------

def get_char(block = True):
    if block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
        return sys.stdin.read(1)
    raise error('NoChar')

我相信我曾经在Linux上尝试过这个,但我认为它不起作用。然而,在我现在使用的Mac上,它肯定不起作用。无论块是真还是假,它仍然会阻塞。此外,用户必须按回车键才能释放积累的“洪水”字符。也许尝试将输入模式设置为原始(tty.setraw()),但之后还要将其设置为熟悉的模式。 - dylnmc
这对我来说有点奇怪,使用read(1),但是使用stdin.readline()就完美地解决了。 - Thomas Ahle
我一开始也是这么想的,但后来我意识到他不一定有 tty。他谈论的是一个输入流。如果他知道他有 tty,他确实可以使用 tty.setraw(sys.stdin),但如果竞赛通过 ssh 会话执行他的程序,那就行不通了(例如)。 - Keeely

3

这是一个类似于swdev的答案的posix解决方案。

正如他们所说,你需要使用termios.VMINtermios.VTIME来捕获多个字符,而不需要用户按下Enter。仅使用原始模式将会有问题,因为特殊键如箭头可能会干扰下一个按键。

在这里,我们使用tty.setcbreak()tty.setraw()作为快捷方式,但它们有简短的内部说明

import termios
import tty
import sys
import select

def get_enter_key():
    fd = sys.stdin.fileno()
    orig_fl = termios.tcgetattr(fd)
    try:
        tty.setcbreak(fd)  # use tty.setraw() instead to catch ^C also
        mode = termios.tcgetattr(fd)
        CC = 6
        mode[CC][termios.VMIN] = 0
        mode[CC][termios.VTIME] = 0
        termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
        keypress, _, _ = select.select([fd], [], [])
        if keypress:
            return sys.stdin.read(4095)
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, orig_fl)

try:
    while True:
        print(get_enter_key())
except KeyboardInterrupt:
    print('exiting')
    sys.exit()

请注意,这里有两个可能的超时时间可供添加:


1
你能解释一下为什么要读取这么多字节吗?sys.stdin.read(4095) - Neuron
我写下这段代码已经有一段时间了,但它的作用只是消耗所有输入内容,以防用户粘贴了一些长文本。 - Arnaudv6

0

你应该能够使用以下任一方法消除流:

sys.stdin.read(1)

读取UTF-8解码字符或者:

sys.stdin.buffer.read(1)

读取原始字符。

如果我想要及时从stdin获取原始数据并对其进行处理,而不是先读取换行符或填充内部缓冲区,我会这样做。这适用于通过ssh远程运行程序的情况,其中tty不可用,请参见:

ssh me@host '/usr/bin/python -c "import sys; print(sys.stdin.isatty())"'

在这种情况下,还有一些其他需要考虑的事情,以使程序按预期工作。当完成时,您需要刷新输出以避免缓冲延迟,并且很容易假设程序没有读取输入,而实际上只是没有刷新输出。

stdout.write("my data")
stdout.flush()

通常情况下,输入读取并不是问题所在,而是终端(或程序)提供的输入流在你期望时没有交付,或者可能在你期望时没有读取你的输出。如果你有一个 tty(参见上面的 ssh 检查),你可以使用 tty 模块将其置于原始模式。
import sys
import termios
import tty

old = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin)
c = None
try:
    c = sys.stdin.read(1)[0]
finally:
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old)
print(c)

如果使用Mac/Linux,可以使用相应的方法。如果使用Windows,则可以使用msvcrt.getch()。


0

0
现在你可以直接使用set_blocking。这在Linux上可以工作,并且可能通过改变fd的创建方式来适应其他系统。
class CooperativeInterrupt:

    def __init__(self):
        self.fd = open('/dev/stdin', 'rb')
        os.set_blocking(self.fd.fileno(), False)

    def __del__(self):
        if self.fd is not None:
            self.fd.close()

    def has_interrupt(self):
        data = self.fd.read()
        if data is None:
            return False
        if len(data) == 0:
            raise KeyboardInterrupt()  # Stdin closed (e.g. CTRL+D).
        return True


coi = CooperativeInterrupt()
for i in range(10000000000000):
    if coi.has_interrupt():
        print(i)
    time.sleep(0.1)

-7

使用生成器 - 值得庆幸的是,sys.stdin 已经是一个生成器!

生成器使你能够处理无限流。每次调用它时,它都会返回下一个元素。为了构建生成器,你需要使用 yield 关键字。

for line in sys.stdin:
    print line

    if a_certain_situation_happens:
        break        

如果发生某种期望的情况,请不要忘记在循环中放置一个break语句。

您可以在以下网址找到有关生成器的更多信息:


不过,还有其他因素在起作用吗?比如流是按行缓冲还是块缓冲? - Platinum Azure
3
sys.stdin已经是一个生成器,所以你可以直接使用for line in sys.stdin: ...或者使用更新的fileinput模块。但是两者都不支持非阻塞操作。 - gatoatigrado

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接