你如何判断sys.stdin.readline()是否会阻塞?

15
如何确定调用sys.stdin.readline()(或更一般地说,任何基于文件描述符的文件对象上的readline())是否会阻塞?
当我在python中编写基于行的文本过滤程序时,这个问题就出现了; 也就是说,该程序重复从输入读取一行文本,可能对其进行转换,然后将其写入输出。
我想实现一个合理的输出缓冲策略。 我的标准是:
1. 在处理数百万行数据时,它应该是高效的-大多数情况下缓冲输出,偶尔刷新。 2. 在保持缓冲输出时不应阻塞输入。
因此,非缓冲输出不好,因为它违反了(1)(向操作系统写入太多次)。 而行缓冲输出也不好,因为它仍然违反了(1)(在批量处理一百万行中的每一行时将输出刷新到操作系统没有意义)。 默认缓冲输出也不好,因为它违反了(2)(如果输出是文件或管道,则会不适当地保留输出)。
我认为,在大多数情况下,一个好的解决方案是: “每当(其缓冲区已满或)sys.stdin.readline()即将阻塞时,刷新sys.stdout”。 可以实现吗?
(请注意,我不断言这种策略对于所有情况都是完美的。例如, 在程序受CPU限制的情况下,它可能不理想;在这种情况下,更频繁地刷新以避免在执行长时间计算时保留输出可能是明智的。)
为了明确起见,让我们说我正在使用python实现unix的“cat -n”程序。

(实际上,“cat -n”比逐行读取更智能;也就是说,它知道在完整读取一行之前如何读取和写入部分行;但是,对于这个例子,我仍然要逐行实现它。)

逐行缓冲实现

(表现良好,但违反标准(1),即由于刷新太频繁而速度过慢):

#!/usr/bin/python
# cat-n.linebuffered.py
import sys
num_lines_read = 0
while True:
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))
  sys.stdout.flush()

默认缓冲实现

(速度快但违反标准(2),即不友好的输出保留)

#!/usr/bin/python
# cat-n.defaultbuffered.py
import sys
num_lines_read = 0
while True:
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))

期望实现:

#!/usr/bin/python
num_lines_read = 0
while True:
  if sys_stdin_readline_is_about_to_block():  # <--- How do I implement this??
    sys.stdout.flush()
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))

所以问题是:是否可以实现sys_stdin_readline_is_about_to_block()
我希望得到适用于Python2和Python3的答案。 我已经研究了以下每种技术,但迄今为止都无济于事。
使用select([sys.stdin],[],[],0)来判断是否从sys.stdin读取会阻塞。(当sys.stdin是缓冲文件对象时,这种方法不适用,至少有一到两个原因:(1)如果底层输入管道中准备好了一个部分行,则它会错误地显示“不会阻塞”,(2)如果sys.stdin的缓冲区包含一个完整的输入行但底层管道还没有准备好进行额外的读取,则它会错误地显示“会阻塞”...我想。)
使用os.fdopen(sys.stdin.fileno(), 'r')fcntlO_NONBLOCK进行非阻塞io。(我无法在任何Python版本中让它与readline()配合使用:在Python2.7中,每当出现部分行时,它都会丢失输入;在Python3中,似乎无法区分“将阻塞”和“输入结束”。??) asyncio(我不确定python2中有哪些内容可用;而且我认为它不能与sys.stdin一起使用;但是,如果仅在从subprocess.Popen()返回的管道中读取时有效,我仍然很感兴趣。)
创建一个线程来执行readline()循环,并通过queue.Queue将每行传递给主程序;然后,在每次从队列中读取每行之前,主程序可以轮询队列,并在看到它即将阻塞时先刷新stdout。(我尝试过这种方法,并且实际上使其工作了,请参见下面的内容,但是它非常缓慢,比行缓冲要慢得多。)

线程实现:

请注意,这并不严格回答“如何判断sys.stdin.readline()是否会阻塞”的问题,但它仍然成功地实现了所需的缓冲策略。但是它的速度太慢了。

#!/usr/bin/python
# cat-n.threaded.py
import queue
import sys
import threading
def iter_with_abouttoblock_cb(callable, sentinel, abouttoblock_cb, qsize=100):
  # child will send each item through q to parent.
  q = queue.Queue(qsize)
  def child_fun():
    for item in iter(callable, sentinel):
      q.put(item)
    q.put(sentinel)
  child = threading.Thread(target=child_fun)
  # The child thread normally runs until it sees the sentinel,
  # but we mark it daemon so that it won't prevent the parent
  # from exiting prematurely if it wants.
  child.daemon = True
  child.start()
  while True:
    try:
      item = q.get(block=False)
    except queue.Empty:
      # q is empty; call abouttoblock_cb before blocking
      abouttoblock_cb()
      item = q.get(block=True)
    if item == sentinel:
      break  # do *not* yield sentinel
    yield item
  child.join()

num_lines_read = 0
for line in iter_with_abouttoblock_cb(sys.stdin.readline,
                                      sentinel='',
                                      abouttoblock_cb=sys.stdout.flush):
  num_lines_read += 1
  sys.stdout.write("%d: %s" % (num_lines_read, line))

验证缓冲行为:

以下命令(在Linux的Bash中)展示了预期的缓冲行为:"defaultbuffered"缓冲过于积极,而"linebuffered"和"threaded"则恰到好处。

(注意,管道末尾的| cat是为了使Python默认块缓冲而不是行缓冲。)

for which in defaultbuffered linebuffered threaded; do
  for python in python2.7 python3.5; do
    echo "$python cat-n.$which.py:"
      (echo z; echo -n a; sleep 1; echo b; sleep 1; echo -n c; sleep 1; echo d; echo x; echo y; echo z; sleep 1; echo -n e; sleep 1; echo f) | $python cat-n.$which.py | cat
  done
done

输出:

python2.7 cat-n.defaultbuffered.py:
[... pauses 5 seconds here. Bad! ...]
1: z
2: ab
3: cd
4: x
5: y
6: z
7: ef
python3.5 cat-n.defaultbuffered.py:
[same]
python2.7 cat-n.linebuffered.py:
1: z
[... pauses 1 second here, as expected ...]
2: ab
[... pauses 2 seconds here, as expected ...]
3: cd
4: x
5: y
6: z
[... pauses 2 seconds here, as expected ...]
6: ef
python3.5 cat-n.linebuffered.py:
[same]
python2.7 cat-n.threaded.py:
[same]
python3.5 cat-n.threaded.py:
[same]

时间:

(在 Linux 的 bash 中):

for which in defaultbuffered linebuffered threaded; do
  for python in python2.7 python3.5; do
    echo -n "$python cat-n.$which.py:  "
      timings=$(time (yes 01234567890123456789012345678901234567890123456789012345678901234567890123456789 | head -1000000 | $python cat-n.$which.py >| /tmp/REMOVE_ME) 2>&1)
      echo $timings
  done
done
/bin/rm /tmp/REMOVE_ME

输出:

python2.7 cat-n.defaultbuffered.py:  real 0m1.490s user 0m1.191s sys 0m0.386s
python3.5 cat-n.defaultbuffered.py:  real 0m1.633s user 0m1.007s sys 0m0.311s
python2.7 cat-n.linebuffered.py:  real 0m5.248s user 0m2.198s sys 0m2.704s
python3.5 cat-n.linebuffered.py:  real 0m6.462s user 0m3.038s sys 0m3.224s
python2.7 cat-n.threaded.py:  real 0m25.097s user 0m18.392s sys 0m16.483s
python3.5 cat-n.threaded.py:  real 0m12.655s user 0m11.722s sys 0m1.540s

再次强调,我希望有一个解决方案,即使缓冲输出仍不会阻塞(“linebuffered”和“threaded”在这方面都很好),而且速度也要快:也就是说,与“defaultbuffered”相比速度要相当。


1
我不确定在没有错误的可能性的情况下,你是否能够知道stdin.readline()是否会阻塞。因此,我不知道你所期望的实现是否可行。 - g.d.d.c
2个回答

3
你当然可以使用 select,因为它是干这个的,并且在少量文件描述符的情况下性能良好。你必须自己实现行缓冲/断点,以便在缓冲(事实证明只是)部分行之后检测是否有更多输入可用。
你可以自己进行所有缓冲(这是合理的,因为select在文件描述符级别上运行),或者将stdin设置为非阻塞,并使用file.read()BufferedReader.read()(取决于您的Python版本)消耗任何可用内容。无论缓冲如何,如果您的输入可能是Internet套接字,则必须使用非阻塞输入,因为select的常见实现可能会虚假地指示套接字中存在可读数据。(Python 2版本在这种情况下引发带有EAGAINIOError;Python 3版本返回None。)
os.fdopen无法在此处提供帮助,因为它不会为fcntl创建新的文件描述符。在某些系统上,您可以使用O_NONBLOCK打开/dev/stdin。)
一个基于默认(缓冲)file.read()的Python 2实现:
import sys,os,select,fcntl,errno

fcntl.fcntl(sys.stdin.fileno(),fcntl.F_SETFL,os.O_NONBLOCK)

rfs=[sys.stdin.fileno()]
xfs=rfs+[sys.stdout.fileno()]

buf=""
lnum=0
timeout=None
rd=True
while rd:
  rl,_,xl=select.select(rfs,(),xfs,timeout)
  if xl: raise IOError          # "exception" occurred (TCP OOB data?)
  if rl:
    try: rd=sys.stdin.read()    # read whatever we have
    except IOError as e:        # spurious readiness?
      if e.errno!=errno.EAGAIN: raise # die on other errors
    else: buf+=rd
    nl0=0                       # previous newline
    while True:
      nl=buf.find('\n',nl0)
      if nl<0:
        buf=buf[nl0:]           # hold partial line for "processing"
        break
      lnum+=1
      print "%d: %s"%(lnum,buf[nl0:nl])
      timeout=0
      nl0=nl+1
  else:                         # no input yet
    sys.stdout.flush()
    timeout=None

if buf: sys.stdout.write("%d: %s"%(lnum+1,buf)) # write any partial last line

仅使用 cat -n,我们可以在获取部分行时立即将其写出,但这会保留它们以表示一次性处理整个行。

在我的(不起眼的)计算机上,您的 yes 测试需要 "real 0m2.454s user 0m2.144s sys 0m0.504s"。


谢谢你的提示。关于非阻塞I/O的fdopen/fcnt,我使用它们是正确的,只是在提问时打错了字;我刚刚纠正了它。我认为我遇到的问题是我仍然尝试使用文本模式和sys.stdin.readline(),即使是在非阻塞I/O下,这根本行不通(以各种不优雅的方式,例如在Python3中返回""当没有东西可读时,使其无法与文件结尾区分开来)。 - Don Hatch
@DonHatch:我很乐意编辑fdopen部分,但首先有一个问题:你为什么要fdopen stdin呢?它甚至不会创建一个新的文件描述符,更不用说一个能支持O_NONBLOCK的新打开文件描述符了。 - Davis Herring
你说得对,我关于设置O_NONBLOCK的描述仍然有些混乱。实际上,我也尝试过使用fdopen stdin.fileno(),但那不是为了获取非阻塞,而是为了尝试获得具有各种不同缓冲模式的文件对象...特别是buffering=0(这似乎又要求我将其放入非文本模式'rb'中...不幸的是,这是有侵入性的,因为我希望不要干扰原始的readline()语义)。 - Don Hatch
@DonHatch:在Python 3中,您可以使用sys.stdin.buffer.raw获取底层未缓冲的流。您不能拥有未缓冲的文本流,因为它们可能需要保留少于一个字符的字节。 - Davis Herring
你的回答听起来像是“必须这样做”,而不是“可以这样做”,因为我只看到一种通过迷宫的方法,那就是放弃readline()。总结一下:“要判断读取是否会阻塞,请在文件描述符上使用select(...,timeout=0),这意味着你必须自己实现缓冲(包括readline()),这又意味着你需要输入流是未经缓冲的,进而意味着它必须是二进制的(所以如果你想要文本模式,你也必须自己实现);此外,由于select可能会出现误报,你需要将fd设置为非阻塞的。” 我理解得对吗? - Don Hatch
如果您想展示使用您所思考的技术实现“cat -n的期望实现”的干净实现,那将不胜感激。特别是,如果我之前的评论中描述的“迷宫只有一条路”是不正确的(例如,如果您知道如何利用BufferedReader--我不知道),也许您可以通过展示您所考虑的各种实现来澄清。 - Don Hatch

-1
# -*- coding: utf-8 -*-
import os
import sys
import select
import fcntl
import threading


class StdInput:
    def __init__(self):
        self.close_evt = threading.Event()

        fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK);
        self.input = (sys.stdin.original_stdin if hasattr(sys.stdin, "original_stdin") else sys.stdin)
        self.epoll = select.epoll()
        self.epoll.register(sys.stdin.fileno(), select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP | select.EPOLLRDBAND)

    def read(self):
        while not self.close_evt.is_set():
            input_line = self.input.readline()
            # If the object is in non-blocking mode and no bytes are available, None is returned.
            if input_line is not None and len(input_line) > 0:
                break           
            print("Nothing yet...")
            evt_lst = self.epoll.poll(1.0)  # Timeout 1s
            print("Poll exited: event list size={}".format(len(evt_lst)))
            if len(evt_lst) > 0:
                assert len(evt_lst) == 1
                if (evt_lst[0][1] & (select.EPOLLERR | select.EPOLLHUP)) > 0:
                    raise Exception("Ooops!!!")
        return input_line


if __name__ == "__main__":
    i = StdInput()

    def alm_handle():
        i.close_evt.set()
    threading.Timer(4, alm_handle).start()

    print("Reading...")
    input_line = i.read()
    print("Read='{}'".format(input_line))

2
虽然这段代码可能回答了问题,但提供有关它如何以及/或为什么解决问题的附加上下文将改善答案的长期价值。 - Piotr Labunski

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接