在Windows系统中以非阻塞方式读取Python文件

9
我有一个在Windows(Win7)上运行的程序,每隔x秒会向一个txt文件写入内容。现在我有一个Python脚本,每隔x秒读取这个txt文件。当Python脚本正在读取文件并且同时有另一个程序想要写入该文件时,写入程序会崩溃(并显示权限错误)。由于我无法修改程序写入txt文件的方式,因此必须尝试在不阻塞写入程序的情况下打开txt文件进行读取。有谁知道我在这种情况下应该怎么做(非阻塞读取)?非常感谢任何关于这个主题的提示!
尝试读取文件的程序代码大致如下:
    with codecs.open(datapath, "r", 'utf-16') as raw_data:

         raw_data_x = raw_data.readlines()

我必须使用 "codecs" 打开文件,因为它是 Unicode 编码的。


请查看此答案:https://dev59.com/v4rda4cB1Zd3GeqPQLkU#30172682 - Vader
谢谢您的快速回答!我已经看到了那篇文章。我认为它不适用于Windows系统。 - Py42
@Vader:那个链接使用的是UNIX特有的fcntl,而OP正在使用Windows。 - cdarke
1
请查看win32file。它允许更多的Windows特定标志。 - Ben
1
有几种方法可以实现这个,包括OVERLAPPED IO(参见Win32 API CreateFile)和“IO完成端口”https://dev59.com/AUjSa4cB1Zd3GeqPGZTb。您将需要一个Win32扩展模块(我已经在C++中完成了这个,但没有在Python中)。这是一个非平凡的任务。 - cdarke
1
在Windows上能够写入由另一个进程打开的文件似乎是一个单独的问题,与如何使用异步I/O(例如查看multiprocessingasyncio如何使用它)无关。 - jfs
2个回答

7

经过长时间的努力,我终于创建了一个使用ctypes实现的函数来帮助您完成这个任务。请注意,如果进程已经获得了“独占”访问权限,此方法将无效。如果是这样,您需要使用影子复制服务,例如这里或者这里所示的实现。
不管怎样,以下是代码:

import ctypes
from ctypes import wintypes
import os
import msvcrt

GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000

OPEN_EXISTING = 3
OPEN_ALWAYS = 4

ACCESS_MODES = {
    "r": GENERIC_READ,
    "w": GENERIC_WRITE,
    "r+": (GENERIC_READ|GENERIC_WRITE)
}

OPEN_MODES = {
    "r": OPEN_EXISTING,
    "w": OPEN_ALWAYS,
    "r+": OPEN_ALWAYS,
}


def open_file_nonblocking(filename, access):
    # Removes the b for binary access.
    internal_access = access.replace("b", "")
    access_mode = ACCESS_MODES[internal_access]
    open_mode = OPEN_MODES[internal_access]
    handle = wintypes.HANDLE(ctypes.windll.kernel32.CreateFileW(
        wintypes.LPWSTR(filename),
        wintypes.DWORD(access_mode),
        wintypes.DWORD(2|1),  # File share read and write
        ctypes.c_void_p(0),
        wintypes.DWORD(open_mode),
        wintypes.DWORD(0),
        wintypes.HANDLE(0)
    ))

    try:
        fd = msvcrt.open_osfhandle(handle.value, 0)
    except OverflowError as exc:
        # Python 3.X
        raise OSError("Failed to open file.") from None
        # Python 2
        # raise OSError("Failed to open file.")

    return os.fdopen(fd, access)

该函数打开文件并共享读写句柄,允许多次访问。然后将句柄转换为普通的Python文件对象。
使用完毕后请务必关闭文件。


0

最近我需要在Python中使用跨平台兼容性对stdin和stdout进行I/O读取。
对于Linux:
对于Linux,我们可以使用select模块。它是posix select函数的封装实现。它允许您传递多个文件描述符,并等待它们准备就绪。一旦它们准备就绪,您将收到通知并可以执行read/write操作。下面是一小段代码,可以给您一个想法:
这里的nodejs是一个带有nodejs镜像的Docker环境。

  stdin_buf = BytesIO(json.dumps(fn) + "\n")
  stdout_buf = BytesIO()
  stderr_buf = BytesIO()

  rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
  wselect = [nodejs.stdin]  # type: List[BytesIO]
  while (len(wselect) + len(rselect)) > 0:
            rready, wready, _ = select.select(rselect, wselect, [])
            try:
                if nodejs.stdin in wready:
                    b = stdin_buf.read(select.PIPE_BUF)
                    if b:
                        os.write(nodejs.stdin.fileno(), b)
                    else:
                        wselect = []
                for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
                    if pipes[0] in rready:
                        b = os.read(pipes[0].fileno(), select.PIPE_BUF)
                        if b:
                            pipes[1].write(b)
                        else:
                            rselect.remove(pipes[0])
                if stdout_buf.getvalue().endswith("\n"):
                    rselect = []
            except OSError as e:
                break  

针对Windows操作系统 此代码示例涉及stdin、stdout的读写操作。 但是,由于在Windows操作系统上,select实现不允许将stdin、stdout作为参数传递,因此此代码无法在Windows操作系统上运行。
文档说明如下:

在Windows上,文件对象不可接受,但套接字可以。在Windows上,底层的select()函数由WinSock库提供,并且不能处理不起源于WinSock的文件描述符。

首先,我必须提到有很多用于Windows上非阻塞I/O读取的库,例如asyncio(Python 3)、gevent(Python 2.7),msvcrt以及pywin32win32event,它们会在您的套接字准备好读/写数据时发出警报。但是,没有一个允许我在stdin/stdout上进行读写,会出现错误,例如:
在非套接字上执行操作
仅处理整数值等等。
还有一些其他库,例如twister,我还没有尝试过。

现在,要在Windows平台上实现与上述代码类似的功能,我使用了线程。以下是我的代码:

    stdin_buf = BytesIO(json.dumps(fn) + "\n")
    stdout_buf = BytesIO()
    stderr_buf = BytesIO()

    rselect = [nodejs.stdout, nodejs.stderr]  # type: List[BytesIO]
    wselect = [nodejs.stdin]  # type: List[BytesIO]
    READ_BYTES_SIZE = 512

    # creating queue for reading from a thread to queue
    input_queue = Queue.Queue()
    output_queue = Queue.Queue()
    error_queue = Queue.Queue()

    # To tell threads that output has ended and threads can safely exit
    no_more_output = threading.Lock()
    no_more_output.acquire()
    no_more_error = threading.Lock()
    no_more_error.acquire()

    # put constructed command to input queue which then will be passed to nodejs's stdin
    def put_input(input_queue):
        while True:
            sys.stdout.flush()
            b = stdin_buf.read(READ_BYTES_SIZE)
            if b:
                input_queue.put(b)
            else:
                break

    # get the output from nodejs's stdout and continue till otuput ends
    def get_output(output_queue):
        while not no_more_output.acquire(False):
            b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
            if b:
                output_queue.put(b)

    # get the output from nodejs's stderr and continue till error output ends
    def get_error(error_queue):
        while not no_more_error.acquire(False):
            b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
            if b:
                error_queue.put(b)

    # Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
    input_thread = threading.Thread(target=put_input, args=(input_queue,))
    input_thread.start()
    output_thread = threading.Thread(target=get_output, args=(output_queue,))
    output_thread.start()
    error_thread = threading.Thread(target=get_error, args=(error_queue,))
    error_thread.start()

    # mark if output/error is ready
    output_ready=False
    error_ready=False

    while (len(wselect) + len(rselect)) > 0:
        try:
            if nodejs.stdin in wselect:
                if not input_queue.empty():
                    os.write(nodejs.stdin.fileno(), input_queue.get())
                elif not input_thread.is_alive():
                    wselect = []
            if nodejs.stdout in rselect:
                if not output_queue.empty():
                    output_ready = True
                    stdout_buf.write(output_queue.get())
                elif output_ready:
                    rselect = []
                    no_more_output.release()
                    no_more_error.release()
                    output_thread.join()

            if nodejs.stderr in rselect:
                if not error_queue.empty():
                    error_ready = True
                    stderr_buf.write(error_queue.get())
                elif error_ready:
                    rselect = []
                    no_more_output.release()
                    no_more_error.release()
                    output_thread.join()
                    error_thread.join()
            if stdout_buf.getvalue().endswith("\n"):
                rselect = []
                no_more_output.release()
                no_more_error.release()
                output_thread.join()
        except OSError as e:
            break

所以对我来说,最好的选择是使用线程。如果你想了解更多信息,可以阅读这篇不错的文章


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接