我创建了一个名为
BufferedStringReader
的类,它作为一个字节流,首先读取字符串,然后是一些无限字节流。我假设你所说的字节流是一个文件,比如块设备或字符设备。虽然这些是我知道的唯一无限的字节流。
简要介绍一下我的实现方式:我的类没有继承自
IOBase
、
BufferedReader
或其他Python IO对象,因为许多成员和方法具有相同的名称。由于这个类必须像字符串流和文件流那样运行,所以这些对象被存储为成员,我的类没有从它们继承。
我决定不使用BufferReader
中的tell()
方法来实现位置,因为当使用(some BufferReader object).tell()
时会出现奇怪的问题:它返回的不是正数,而是在某个块大小的负数和0之间的值。所以当文件被打开时,位置是零,但当你开始从它读取时,位置是-4096加上实际位置,读取4096字节后会回绕。而且块大小在你的系统上可能甚至不是4096,所以非常奇怪。(如果你的情况不同,请告诉我。)
另外,由于你的问题涉及到无限字节流,我的类不支持写入。对于我所举的块设备的例子,它也不支持。
最后一件事是,不支持无限读取,因为它通常用于无限流,这样的调用会耗尽内存。
from io import BufferedReader, StringIO, UnsupportedOperation
class BufferedStringReader:
def __init__(self, string, name, mode):
if mode != 'rb' and mode != 'br':
raise ValueError("invalid mode: " + mode)
self.reader = open(name, mode)
self.stringio = StringIO(string)
self.string = string
self.stringlen = len(string)
self.readerpos = 0
self.pos = self.stringio.tell() + self.readerpos
self.closed = (self.reader.closed and self.stringio.closed)
def __update_state__(self):
self.pos = self.stringio.tell() + self.readerpos
self.closed = (self.reader.closed and self.stringio.closed)
def __repr__(self):
return "{BufferedStringReader reader=" + str(self.reader) + " stringio=" + str(self.stringio) + "}"
def getvalue(self):
raise UnsupportedOperation("BufferedStringReader does not support getvalue()")
def close(self):
self.reader.close()
self.stringio.close()
self.closed = True
def flush(self):
raise UnsupportedOperation("BufferedStringReader does not support flush()")
def detach(self):
self.reader.detach()
def isatty(self):
return False
def peek(self, size=0):
peek_size = (size if size > 0 else 1)
string_pos = self.stringio.tell()
string_read = self.string[string_pos:string_pos+peek_size]
if len(string_read) < peek_size:
reader_read = self.reader.peek(peek_size)
else:
reader_read = b''
return (bytes(string_read, 'utf-8') + reader_read)[0:peek_size]
def read(self, size=-1):
if size <= -1:
raise UnsupportedOperation("BufferedStringReader does not support read(size=-1)")
string_read = ''
reader_read = b''
if self.pos <= self.stringlen and self.pos+size <= self.stringlen:
string_read = self.stringio.read(size);
elif self.pos <= self.stringlen and self.pos+size > self.stringlen:
string_read = self.stringio.read(size);
reader_read = self.reader.read(size-len(string_read));
elif self.pos > self.stringlen:
reader_read = self.reader.read(size);
else:
raise RuntimeError
self.readerpos += len(reader_read)
self.__update_state__()
return bytes(string_read, 'utf-8') + reader_read;
def readable(self):
return True
def read1(self, size=-1):
raise UnsupportedOperation("BufferedStringReader does not support read1()")
def readinto(self, bytarray):
temp_read = self.read(len(bytarray))
temp_array = bytearray(temp_read)
if len(temp_array) <= 0:
return 0
bytarray[0:len(temp_array)] = temp_array
return len(temp_array)
def readinto1(self, bytarray):
raise UnsupportedOperation("BufferedStringReader does not support readinto1()")
def readlines(self, size=-1):
read_size = 0;
if size >= 0:
temp_read = self.peek(size)
newline = temp_read.find(b'\n')
if newline == -1:
read_size = len(temp_read)
else:
read_size = newline+1
else:
newline = -1
while newline == -1:
read_size += 4096
temp_read = self.peek(read_size);
newline = temp_read.find(b'\n')
read_size = newline+1
return self.read(read_size);
def seek(self, pos, whence=0):
if whence==2:
UnsupportedOperation("BufferedStringReader does not support seek(whence=2)")
elif whence==1:
self.pos = max(self.pos+pos, 0)
elif whence==0:
if pos < 0:
raise OSError("[Errno 22] Invalid argument")
self.pos = pos
else:
raise ValueError("whence value " + str(whence) + " unsupported")
self.stringio.seek(min(self.pos, self.stringlen), 0)
self.reader.seek(max(self.pos - self.stringlen, 0), 0)
self.readerpos = max(self.pos - self.stringlen, 0)
self.__update_state__()
return self.pos
def seekable(self):
return True
def tell(self):
return self.pos
def truncate(self, pos=None):
raise UnsupportedOperation("BufferedStringReader does not support truncate()")
def write(self, s):
raise UnsupportedOperation("BufferedStringReader does not support write()")
def writelines(self, s):
raise UnsupportedOperation("BufferedStringReader does not support writelines()")
def writable(self):
return False
reader = BufferedStringReader("something", "/dev/urandom", "rb")
print(reader)
assert reader.readable() == True
assert reader.seekable() == True
assert reader.writable() == False
assert reader.isatty() == False
assert reader.tell() == 0
assert reader.read(4) == b"some"
assert reader.tell() == 4
reader.seek(pos=-2, whence=1)
assert reader.read(2) == b"me"
reader.seek(pos=0, whence=0)
assert reader.tell() == 0
print(reader.read(12))
assert reader.tell() == 12
print(reader.read(12))
assert reader.tell() == 24
zero_array = bytearray(0)
assert reader.readinto(zero_array) == 0
assert zero_array == b''
zero_array = bytearray(10)
assert reader.readinto(zero_array) == 10
print(zero_array)
reader.seek(pos=0, whence=0)
assert reader.peek(4) == b"some"
assert reader.read(4) == b"some"
assert reader.tell() == 4
print(reader.peek(8))
print(reader.read(8))
assert reader.tell() == 12
print(reader.peek(8))
assert reader.tell() == 12
reader.close()
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")
assert reader.readlines() == b"something\n"
assert reader.readlines(3) == b"spa"
assert reader.readlines() == b"m\n"
assert reader.readlines(5) == b"eggs\n"
reader.detach()
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")
try:
reader.flush()
except UnsupportedOperation as e:
print(e)
try:
reader.getvalue()
except UnsupportedOperation as e:
print(e)
try:
reader.read(size=-1)
except UnsupportedOperation as e:
print(e)
try:
reader.read1(10)
except UnsupportedOperation as e:
print(e)
try:
reader.readinto1(10)
except UnsupportedOperation as e:
print(e)
try:
reader.seek(pos=100, whence=2)
except UnsupportedOperation as e:
print(e)
try:
reader.truncate(10)
except UnsupportedOperation as e:
print(e)
try:
reader.write('eggs')
except UnsupportedOperation as e:
print(e)
try:
reader.writelines('eggs')
except UnsupportedOperation as e:
print(e)
try:
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "wb")
except ValueError as e:
print(e)
以下是使用
BufferedStringReader
的一些示例:
>>> reader = BufferedStringReader("something", "/dev/urandom", "rb")
>>> bytes_object = reader.read(20)
>>>
>>> print(bytes_object)
b'somethingc\xab\xf6\xab\xea\xd1q C(\x05'