Python 3 asyncio中是否有类似于socket.recv_into的操作?

5

Socket模块有一个socket.recv_into方法,因此它可以使用用户定义的bytebuffer(如bytearray)进行零拷贝。但是也许BaseEventLoop没有类似的方法。在asyncio中是否有一种方法可以使用类似socket.recv_into的方法呢?

3个回答

1
更新:从我写这篇文章的时候起,Python 3.7.0已经开始发布alpha版本,标准库的asyncio模块文档中包含了AbstractEventLoop.sock_recv_into()方法。
编辑:根据要求扩展回答...
调用asyncio的sock_recv_into()通常看起来像这样:
byte_count = await loop.sock_recv_into(sock, buff)

缓冲区是一个可变对象,实现了Python的缓冲协议,例如bytearray和基于bytearray的memoryview。下面的代码演示了使用memoryview将数据接收到bytearray中。

用于asyncio套接字的工作演示代码必须包括一堆脚手架来设置连接的双方并运行事件循环。这里的重点是在下面的sock_read_exactly()协程中使用asyncio的sock_recv_into()。

#!/usr/bin/env python3

"""Demo the asyncio module's sock_recv_into() facility."""

import sys
assert sys.version_info[:2] >= (3, 7), (
        'asyncio sock_recv_into() new in Python 3.7')

import socket
import asyncio

def local_echo_server(port=0):
    """Trivial treaded echo server with sleep delay."""
    import threading
    import time
    import random
    ssock = socket.socket()
    ssock.bind(('127.0.0.1', port))
    _, port = ssock.getsockname()
    ssock.listen(5)
    def echo(csock):
        while True:
            data = csock.recv(8192)
            if not data:
                break
            time.sleep(random.random())
            csock.sendall(data)
        csock.shutdown(1)
    def serve():
        while True:
            csock, client_addr = ssock.accept()
            tclient = threading.Thread(target=echo, args=(csock,), daemon=True)
            tclient.start()
    tserve = threading.Thread(target=serve, daemon=True)
    tserve.start()
    return port


N_COROS = 100
nrunning = 0

async def sock_read_exactly(sock, size, loop=None):
    "Read and return size bytes from sock in event-loop loop."
    if loop is None: loop = asyncio.get_event_loop()
    bytebuff = bytearray(size)
    sofar = 0
    while sofar < size:
        memview = memoryview(bytebuff)[sofar:]
        nread = await loop.sock_recv_into(sock, memview)
        print('socket', sock.getsockname(), 'read %d bytes' % nread)
        if not nread:
            raise RuntimeError('Unexpected socket shutdown.')
        sofar += nread
    return bytebuff

async def echo_client(port):
    "Send random data to echo server and test that we get back the same."
    from os import urandom
    global nrunning
    loop = asyncio.get_event_loop()
    sock = socket.socket()
    sock.setblocking(False)
    await loop.sock_connect(sock, ('127.0.0.1', port))
    for size in [1, 64, 1024, 55555]:
        sending = urandom(size)
        await loop.sock_sendall(sock, sending)
        received = await sock_read_exactly(sock, size)
        assert received == sending
    nrunning -= 1
    if not nrunning:
        loop.stop()


if __name__ == '__main__':
    port = local_echo_server()
    print('port is', port)
    loop = asyncio.get_event_loop()
    for _ in range(N_COROS):
        loop.create_task(echo_client(port))
        nrunning += 1
    print('Start loop.')
    loop.run_forever()

你能否扩展一下你的回答,解释如何使用AbstractEventLoop.sock_recv_into()?可以包含一个代码示例。 - Tom Aranda

1

BaseEventLoop 定义的低级套接字操作需要传入一个 socket.socket 对象,例如 BaseEventLoop.sock_recv(sock, nbytes)。因此,如果您拥有一个 socket.socket,可以调用 sock.recv_into()。但是是否这样做是个问题。


我已经使用了 loop.sock_recv。但是 loop.sock_recv 返回的是 bytes 对象。所以我还要使用 bytearray.extend 方法。例如:ba = bytearray() bytes_object = yield from loop.sock_recv(some_sock) ba.extend(bytes_object)sock_recv 不使用预先创建的固定大小的 bytearray - SeungHyun Hwang
我想要一种像这样的方法:precreated_buffer = bytearray(b' ' * 100); loop.sock_recv_into(some_socket, precreated_buffer, 100) - SeungHyun Hwang
@SeungHyunHwang:我理解你的意思,但是我建议你可以在套接字上调用 sock.recv_into() 而不是 loop.sock_recv()。我不知道这是否在 asyncio 框架内是安全的 - 我怀疑不是。 - mhawke
谢谢您的评论 @mhawke :) 我会尝试一下!因为 sock.recv_into 不是协程,所以很可能会失败 :( - SeungHyun Hwang
@SeungHyunHwang: 您可以创建 BaseEventLoop 的子类,并提供一个 recv_into() 的协程实现。 - mhawke

1
你可以实现自己的asyncio传输,利用.recv_into()函数,但目前asyncio没有开箱即用的方法来使用.recv_into()。个人认为速度提升不会很大:当你使用C进行开发时,零拷贝非常重要,但对于像Python这样的高级语言,好处要少得多。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接