如何使用asyncio实现recvmsg()函数?

4

由于在asyncio模块中缺少recvmsg()函数,因此我尝试以与BaseEventLoop.sock_recv()相同的方式重新实现它:

import asyncio, socket


def _sock_recvmsg(loop, fut, registered, sock, bufsize, ancbufsize):
    self = loop
    fd = sock.fileno()
    if registered: self.remove_reader(fd)
    if fut.cancelled(): return
    try: data = sock.recvmsg(bufsize, ancbufsize)
    except (BlockingIOError, InterruptedError): self.add_reader(fd, self._sock_recvmsg, fut, True, sock, bufsize, ancbufsize)
    except Exception as exc: fut.set_exception(exc)
    else: fut.set_result(data)


def sock_recvmsg(loop, sock, bufsize, ancbufsize=0):
    self = loop
    if self._debug and sock.gettimeout() != 0: raise ValueError('the socket must be non-blocking')
    fut = asyncio.futures.Future(loop=self)
    self._sock_recvmsg(fut, False, sock, bufsize, ancbufsize)
    return fut


asyncio.unix_events._UnixSelectorEventLoop._sock_recvmsg = _sock_recvmsg
asyncio.unix_events._UnixSelectorEventLoop.sock_recvmsg = sock_recvmsg

但是这个简单的测试失败了,只有第一个值被接收到,之后测试就会挂起:
async def produce():
    for i in range(5):
        end_out.sendmsg([bytes([i])])
        await asyncio.sleep(1)

    end_out.close()

async def consume():
    while True:
        v, a, b, c = await asyncio.get_event_loop().sock_recvmsg(end_in, 1)
        if v == b'': return
        print(v)

if __name__ == '__main__':
    end_in, end_out = socket.socketpair()
    asyncio.ensure_future(produce())
    asyncio.get_event_loop().run_until_complete(consume())

我漏掉了什么?

1
谢谢你提供这个宝石。 - MB.
1个回答

2

recvmsg()相关的代码是正确的,但我错过了些什么。

end_in.setblocking(0)
end_out.setblocking(0)

这太蠢了,所以我正在考虑删除那个问题。


1
这对我来说实际上是一个很大的时间节省,因为现在我基本上可以复制粘贴你的实现 ;) - Maarten

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接