尝试读取文件的程序代码大致如下:
with codecs.open(datapath, "r", 'utf-16') as raw_data:
raw_data_x = raw_data.readlines()
我必须使用 "codecs" 打开文件,因为它是 Unicode 编码的。
with codecs.open(datapath, "r", 'utf-16') as raw_data:
raw_data_x = raw_data.readlines()
我必须使用 "codecs" 打开文件,因为它是 Unicode 编码的。
经过长时间的努力,我终于创建了一个使用ctypes实现的函数来帮助您完成这个任务。请注意,如果进程已经获得了“独占”访问权限,此方法将无效。如果是这样,您需要使用影子复制服务,例如这里或者这里所示的实现。
不管怎样,以下是代码:
import ctypes
from ctypes import wintypes
import os
import msvcrt
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
OPEN_ALWAYS = 4
ACCESS_MODES = {
"r": GENERIC_READ,
"w": GENERIC_WRITE,
"r+": (GENERIC_READ|GENERIC_WRITE)
}
OPEN_MODES = {
"r": OPEN_EXISTING,
"w": OPEN_ALWAYS,
"r+": OPEN_ALWAYS,
}
def open_file_nonblocking(filename, access):
# Removes the b for binary access.
internal_access = access.replace("b", "")
access_mode = ACCESS_MODES[internal_access]
open_mode = OPEN_MODES[internal_access]
handle = wintypes.HANDLE(ctypes.windll.kernel32.CreateFileW(
wintypes.LPWSTR(filename),
wintypes.DWORD(access_mode),
wintypes.DWORD(2|1), # File share read and write
ctypes.c_void_p(0),
wintypes.DWORD(open_mode),
wintypes.DWORD(0),
wintypes.HANDLE(0)
))
try:
fd = msvcrt.open_osfhandle(handle.value, 0)
except OverflowError as exc:
# Python 3.X
raise OSError("Failed to open file.") from None
# Python 2
# raise OSError("Failed to open file.")
return os.fdopen(fd, access)
该函数打开文件并共享读写句柄,允许多次访问。然后将句柄转换为普通的Python文件对象。
使用完毕后请务必关闭文件。
最近我需要在Python中使用跨平台兼容性对stdin和stdout进行I/O读取。
对于Linux:
对于Linux,我们可以使用select模块。它是posix select
函数的封装实现。它允许您传递多个文件描述符,并等待它们准备就绪。一旦它们准备就绪,您将收到通知并可以执行read/write
操作。下面是一小段代码,可以给您一个想法:
这里的nodejs是一个带有nodejs镜像的Docker环境。
stdin_buf = BytesIO(json.dumps(fn) + "\n")
stdout_buf = BytesIO()
stderr_buf = BytesIO()
rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
wselect = [nodejs.stdin] # type: List[BytesIO]
while (len(wselect) + len(rselect)) > 0:
rready, wready, _ = select.select(rselect, wselect, [])
try:
if nodejs.stdin in wready:
b = stdin_buf.read(select.PIPE_BUF)
if b:
os.write(nodejs.stdin.fileno(), b)
else:
wselect = []
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
if pipes[0] in rready:
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
if b:
pipes[1].write(b)
else:
rselect.remove(pipes[0])
if stdout_buf.getvalue().endswith("\n"):
rselect = []
except OSError as e:
break
针对Windows操作系统
此代码示例涉及stdin、stdout的读写操作。
但是,由于在Windows操作系统上,select实现不允许将stdin、stdout作为参数传递,因此此代码无法在Windows操作系统上运行。
文档说明如下:
在Windows上,文件对象不可接受,但套接字可以。在Windows上,底层的select()函数由WinSock库提供,并且不能处理不起源于WinSock的文件描述符。
首先,我必须提到有很多用于Windows上非阻塞I/O读取的库,例如asyncio
(Python 3)、gevent
(Python 2.7),msvcrt
以及pywin32
的win32event
,它们会在您的套接字准备好读/写
数据时发出警报。但是,没有一个允许我在stdin/stdout
上进行读写,会出现错误,例如:
在非套接字上执行操作
仅处理整数值
等等。
还有一些其他库,例如twister
,我还没有尝试过。
现在,要在Windows平台上实现与上述代码类似的功能,我使用了线程
。以下是我的代码:
stdin_buf = BytesIO(json.dumps(fn) + "\n")
stdout_buf = BytesIO()
stderr_buf = BytesIO()
rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
wselect = [nodejs.stdin] # type: List[BytesIO]
READ_BYTES_SIZE = 512
# creating queue for reading from a thread to queue
input_queue = Queue.Queue()
output_queue = Queue.Queue()
error_queue = Queue.Queue()
# To tell threads that output has ended and threads can safely exit
no_more_output = threading.Lock()
no_more_output.acquire()
no_more_error = threading.Lock()
no_more_error.acquire()
# put constructed command to input queue which then will be passed to nodejs's stdin
def put_input(input_queue):
while True:
sys.stdout.flush()
b = stdin_buf.read(READ_BYTES_SIZE)
if b:
input_queue.put(b)
else:
break
# get the output from nodejs's stdout and continue till otuput ends
def get_output(output_queue):
while not no_more_output.acquire(False):
b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
if b:
output_queue.put(b)
# get the output from nodejs's stderr and continue till error output ends
def get_error(error_queue):
while not no_more_error.acquire(False):
b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
if b:
error_queue.put(b)
# Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
input_thread = threading.Thread(target=put_input, args=(input_queue,))
input_thread.start()
output_thread = threading.Thread(target=get_output, args=(output_queue,))
output_thread.start()
error_thread = threading.Thread(target=get_error, args=(error_queue,))
error_thread.start()
# mark if output/error is ready
output_ready=False
error_ready=False
while (len(wselect) + len(rselect)) > 0:
try:
if nodejs.stdin in wselect:
if not input_queue.empty():
os.write(nodejs.stdin.fileno(), input_queue.get())
elif not input_thread.is_alive():
wselect = []
if nodejs.stdout in rselect:
if not output_queue.empty():
output_ready = True
stdout_buf.write(output_queue.get())
elif output_ready:
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
if nodejs.stderr in rselect:
if not error_queue.empty():
error_ready = True
stderr_buf.write(error_queue.get())
elif error_ready:
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
error_thread.join()
if stdout_buf.getvalue().endswith("\n"):
rselect = []
no_more_output.release()
no_more_error.release()
output_thread.join()
except OSError as e:
break
所以对我来说,最好的选择是使用线程。如果你想了解更多信息,可以阅读这篇不错的文章。
fcntl
,而OP正在使用Windows。 - cdarkemultiprocessing
或asyncio
如何使用它)无关。 - jfs