Python:检查命名管道是否有数据

6
我在Unix系统上有一个Python3进程一直在运行,我想从其他偶尔运行的进程通过命名管道随机向它发送数据。如果命名管道没有数据,我希望我的进程继续执行其他任务,因此我需要检查它是否有数据,但又不想阻塞进程。
我无法找到在不打开命名管道的情况下进行检查的方法,但是如果我设置了非阻塞标志,则打开会阻塞。如果我设置了标志,则在读取期间或之前写入管道时会导致崩溃。
这是我能做到的最好的方法:
import os

fifo = "pipe_test.fifo"
done = False
fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
while not done:
    try:
        s = os.read(fd, 1024) # buffer size may need tweaking
        print(s)
        done = True
    except BlockingIOError as e:
        pass
os.close(fd)

如果管道中没有数据,我会得到b""并退出。如果管道中有数据,它会出现一次异常,然后重试,最终获取数据。看起来好像我做错了什么,可能会遇到奇怪的竞态条件。有更好的方法来解决这个问题吗?

1
你能否使用第二个线程来等待阻塞管道? - Jasper
@Jasper 我可以做到。虽然这也不是理想的解决方案,但我承认它更好。此外,我可以使用Python的文件库来代替低级别的“os”操作,这样可以让我读取换行符等内容。谢谢,如果没有其他线程检查的方法,我将把代码改成那个方法。 - sudo
2个回答

4

如果你可以更改客户端的代码,我建议不要使用命名管道,而是使用UNIX域套接字,因为它们支持数据报:

import errno, fcntl, os, socket

服务器:

# bind socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind('pipe_test.fifo')
# set socket non-blocking
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

# get a datagram
try:
    datagram = sock.recv(1024)
except (OSError, socket.error) as ex:
    if ex.errno not in (errno.EINTR, errno.EAGAIN):
        raise
else:
    print('Datagram: %r' % datagram)

客户端:

sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.sendto('Hello!', 'pipe_test.fifo')

但你可能想要研究多线程,而不是使用非阻塞套接字。


我不知道数据报。这很有用。但是看起来你抛出和捕获错误的方式与我类似。这样安全吗? - sudo
@sudo,是的。如果没有等待接收的数据报,则会抛出EAGAIN。如果信号中断了接收,则会抛出EINTR。对于非阻塞套接字来说,后一种情况不应该是不可能的,但安全起见也无妨。可能的错误包括:读取缓冲区太小(但您知道自己的数据),以及客户端无法发送数据,因为您读取得太慢。套接字的缓冲区默认为200kB或512个数据包,参见https://dev59.com/bWEh5IYBdhLWcg3w7nSW#22007520。 - Kijewski

0

这并不是真正的答案,但如果对任何人有用的话,这是我如何使用另一个线程来完成它的方法。

class QueryThread(threading.Thread):

    def __init__(self, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.daemon = True
        self.buf = []
        if not general.f_exists("pipe"):
            os.mkfifo("pipe")

    def run(self):
        f = open("pipe")
        while True:
            try:
                query = next(f).replace("\n", "")
                if query != "":
                    self.buf.append(query)
                    print("Read in new query from pipe: {}, buf = {}".format(query, self.buf))
            except StopIteration: # not a pipe error, just means no data is left, so time to re-open
                f.close()
                f = open("pipe")
        f.close()

    def get_query(self):
        if len(self.buf) == 0: return ""
        query = self.buf[0]
        self.buf.__delitem__(0)
        return query

它将以换行符分隔的消息保存在缓冲区中。您可以从另一个线程调用get_query方法并获取最后接收到的消息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接