非阻塞FIFO

5
我如何在两个Python进程之间创建一个FIFO(first in first out),使得当读取者无法处理输入时,允许丢弃行?
  • 如果读取者尝试比写入者更快地进行readreadline操作,则应该阻塞。
  • 如果读取者无法像写入者一样快地工作,则写入者不应阻塞。除了一次只缓冲一行以外,不应缓冲行,并且只有最后一行写入的数据应在读取者下一次readline尝试中接收到。

使用命名管道是否可以实现此目的?还是有其他简单的方法可以实现这个目标?

2个回答

3
以下代码使用命名管道来允许两个脚本之间进行通信。
  • 如果读者试图比写者更快地进行read操作,它会被阻塞。
  • 如果读者跟不上写者,写者不会被阻塞。
  • 操作是基于缓冲区的。目前尚未实现基于行的操作。
  • 此代码应视为概念验证。延迟和缓冲区大小是任意的。

代码

import argparse
import errno
import os
from select import select
import time

class OneFifo(object):
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        if os.path.exists(self.name):
            os.unlink(self.name)
        os.mkfifo(self.name)
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if os.path.exists(self.name):
            os.unlink(self.name)

    def write(self, data):
        print "Waiting for client to open FIFO..."
        try:
            server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENXIO:
                server_file = None
            else:
                raise
        if server_file is not None:
            print "Writing line to FIFO..."
            try:
                os.write(server_file, data)
                print "Done."
            except OSError as exc:
                if exc.errno == errno.EPIPE:
                    pass
                else:
                    raise
            os.close(server_file)

    def read_nonblocking(self):
        result = None
        try:
            client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                client_file = None
            else:
                raise
        if client_file is not None:
            try:
                rlist = [client_file]
                wlist = []
                xlist = []
                rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                if client_file in rlist:
                    result = os.read(client_file, 1024)
            except OSError as exc:
                if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                    result = None
                else:
                    raise
            os.close(client_file)
        return result

    def read(self):
        try:
            with open(self.name, 'r') as client_file:
                result = client_file.read()
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                result = None
            else:
                raise
        if not len(result):
            result = None
        return result

def parse_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--client', action='store_true',
                        help='Set this flag for the client')
    parser.add_argument('-n', '--non-blocking', action='store_true',
                        help='Set this flag to read without blocking')
    result = parser.parse_args()
    return result

if __name__ == '__main__':
    args = parse_argument()
    if not args.client:
        with OneFifo('known_name') as one_fifo:
            while True:
                one_fifo.write('one line')
                time.sleep(0.1)
    else:
        one_fifo = OneFifo('known_name')
        while True:
            if args.non_blocking:
                result = one_fifo.read_nonblocking()
            else:
                result = one_fifo.read()
            if result is not None:
                print result

服务器检查客户端是否打开了FIFO。如果客户端已经打开了FIFO,则服务器会写入一行内容。否则,服务器将继续运行。我实现了非阻塞读取,因为阻塞读取会导致一个问题:如果服务器重新启动,大多数情况下客户端会一直被阻塞,永远无法恢复。使用非阻塞客户端,更容易容忍服务器重新启动。
[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...           
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.

[user@machine:~] python onefifo.py -c
one line
one line

注意事项

启动时,如果 server 发现 FIFO 已经存在,则它会将其删除。这是通知 clients 服务器已重启的最简单方法。阻塞版本的 client 通常会忽略此通知。


这很酷。客户端如何告诉服务器它已准备好接收?它能否告知客户端是否已打开FIFO?服务器是否使用os.O_NONBLOCK来强制执行此操作? - dronus
如果服务器尝试打开FIFO并收到ENXIO(设备未配置)错误,则知道客户端尚未打开FIFO。这种测试仅在服务器使用os.O_NONBLOCK打开FIFO时才有效。否则,当服务器调用open时,它会被阻塞。 - user3657941

0

据我所知,那并不是一个先进先出(队列)- 这是一个单一的变量。如果您设置了最大1个大小的队列或管道,那么它可能是可实现的,但是,使用Lock在其中一个进程中对单个对象进行引用,而另一个进程通过代理对象引用该对象似乎会更好。读者每次读取时将其设置为None,而写者每次写入时都会覆盖内容。

你可以通过将对象的代理和锁的代理作为参数传递给所有相关进程来将它们传递到其他进程。为了更方便地获取它,你可以使用Manager,它提供了一个带有代理的单个对象,你可以传递它,其中包含并提供其他对象(包括锁)的代理。这个答案提供了一个有用的示例,展示了正确使用Manager将对象传递到新进程中。


好的,但是Lock代理对象没有外部表示,例如我可以在一个Python程序中使用它们,但是不能在没有文件系统句柄的情况下将两个程序连接起来。 - dronus
@dronus 当启动新进程时,您可以将其作为参数传递,或使用manager更方便地获取它(因为您只需要传递一个东西)。将编辑答案以包括此内容。 - Vivian

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接