使用StreamReader解码UTF-8的Asyncio

14
我正在逐渐适应asyncio并发编程,并发任务的处理方式非常不错,但是将异步库与传统IO库混合使用可能会很困难。我目前面临的问题是如何正确解码异步StreamReader。
最简单的解决方案是使用read()方法读取字节串块,然后对每个块进行解码——请参见下面的代码。(在我的程序中,我不会打印每个块,而是将其解码为字符串并发送到另一个方法进行处理):
import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))

这个方法很好用,直到出现一个被分割在两个块中的 utf-8 字符。例如,如果响应是 b'M\xc3\xa4dchen mit Bi\xc3\x9f\n',那么读取 3 个块会起作用,但读取 4 个块不会起作用(因为 \xc3\x9f 在不同的块中,解码以 \xc3 结尾的块将引发以下错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

我查看了这个问题的正确解决方案,在阻塞世界中至少有 io.TextIOWrapper 或 codecs.StreamReaderWriter(它们的区别在 PEP 0400 中讨论)。然而,这两种方法都依赖于典型的阻塞流。

我花了 30 分钟搜索 asyncio 的示例,但一直找到的都是我的 decode() 解决方案。有人知道更好的解决方案吗?还是说这是 Python asyncio 中缺失的功能之一?

供参考,以下是使用两个“标准”解码器与异步流的结果。

使用编解码器流读取器:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)

异常:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator

这里直接调用了read()方法,而没有使用yield fromawait

我还尝试使用io.TextIOWrapper包装流对象:

stream = TextIOWrapper(r.content)

但这导致以下结果:
File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'

附注:如果您需要此问题的样本测试用例,请查看this gist。 您可以使用python3.5运行它以重现错误。 如果将块大小从4更改为3(或30),它将正确工作。

编辑

被接受的答案完美地解决了这个问题。 谢谢! 如果其他人也遇到此问题,这是我制作的一个简单的包装器类,可处理StreamReader上的解码:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 

2
在有人问为什么我不只是将整个响应加载到内存中之前,请考虑网络套接字或长保持活动的流(例如couchdb的连续模式下的_changes feed)。我想要解析每个传入的数据,因为它到达时,而不必等待(可能几分钟)HTTP连接关闭。 - Ethan Frey
可能与这个 bug 有关:https://bugs.python.org/issue31087 - Adam
1个回答

8
你可以使用一个 IncrementalDecoder
Utf8Decoder = codecs.getincrementaldecoder('utf-8')

使用您的示例:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')

输出:

Mädchen mit Biß

1
太好了!我试过了,它完美地运行了。感谢您的快速回答。 - Ethan Frey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接