Socket模块有一个socket.recv_into
方法,因此它可以使用用户定义的bytebuffer
(如bytearray
)进行零拷贝。但是也许BaseEventLoop
没有类似的方法。在asyncio中是否有一种方法可以使用类似socket.recv_into
的方法呢?
Socket模块有一个socket.recv_into
方法,因此它可以使用用户定义的bytebuffer
(如bytearray
)进行零拷贝。但是也许BaseEventLoop
没有类似的方法。在asyncio中是否有一种方法可以使用类似socket.recv_into
的方法呢?
byte_count = await loop.sock_recv_into(sock, buff)
缓冲区是一个可变对象,实现了Python的缓冲协议,例如bytearray和基于bytearray的memoryview。下面的代码演示了使用memoryview将数据接收到bytearray中。
用于asyncio套接字的工作演示代码必须包括一堆脚手架来设置连接的双方并运行事件循环。这里的重点是在下面的sock_read_exactly()协程中使用asyncio的sock_recv_into()。
#!/usr/bin/env python3
"""Demo the asyncio module's sock_recv_into() facility."""
import sys
assert sys.version_info[:2] >= (3, 7), (
'asyncio sock_recv_into() new in Python 3.7')
import socket
import asyncio
def local_echo_server(port=0):
"""Trivial treaded echo server with sleep delay."""
import threading
import time
import random
ssock = socket.socket()
ssock.bind(('127.0.0.1', port))
_, port = ssock.getsockname()
ssock.listen(5)
def echo(csock):
while True:
data = csock.recv(8192)
if not data:
break
time.sleep(random.random())
csock.sendall(data)
csock.shutdown(1)
def serve():
while True:
csock, client_addr = ssock.accept()
tclient = threading.Thread(target=echo, args=(csock,), daemon=True)
tclient.start()
tserve = threading.Thread(target=serve, daemon=True)
tserve.start()
return port
N_COROS = 100
nrunning = 0
async def sock_read_exactly(sock, size, loop=None):
"Read and return size bytes from sock in event-loop loop."
if loop is None: loop = asyncio.get_event_loop()
bytebuff = bytearray(size)
sofar = 0
while sofar < size:
memview = memoryview(bytebuff)[sofar:]
nread = await loop.sock_recv_into(sock, memview)
print('socket', sock.getsockname(), 'read %d bytes' % nread)
if not nread:
raise RuntimeError('Unexpected socket shutdown.')
sofar += nread
return bytebuff
async def echo_client(port):
"Send random data to echo server and test that we get back the same."
from os import urandom
global nrunning
loop = asyncio.get_event_loop()
sock = socket.socket()
sock.setblocking(False)
await loop.sock_connect(sock, ('127.0.0.1', port))
for size in [1, 64, 1024, 55555]:
sending = urandom(size)
await loop.sock_sendall(sock, sending)
received = await sock_read_exactly(sock, size)
assert received == sending
nrunning -= 1
if not nrunning:
loop.stop()
if __name__ == '__main__':
port = local_echo_server()
print('port is', port)
loop = asyncio.get_event_loop()
for _ in range(N_COROS):
loop.create_task(echo_client(port))
nrunning += 1
print('Start loop.')
loop.run_forever()
BaseEventLoop
定义的低级套接字操作需要传入一个 socket.socket
对象,例如 BaseEventLoop.sock_recv(sock, nbytes)
。因此,如果您拥有一个 socket.socket
,可以调用 sock.recv_into()
。但是是否这样做是个问题。
loop.sock_recv
。但是 loop.sock_recv
返回的是 bytes 对象。所以我还要使用 bytearray.extend
方法。例如:ba = bytearray()
bytes_object = yield from loop.sock_recv(some_sock)
ba.extend(bytes_object)
sock_recv
不使用预先创建的固定大小的 bytearray
。 - SeungHyun Hwangprecreated_buffer = bytearray(b' ' * 100); loop.sock_recv_into(some_socket, precreated_buffer, 100)
- SeungHyun Hwangsock.recv_into()
而不是 loop.sock_recv()
。我不知道这是否在 asyncio 框架内是安全的 - 我怀疑不是。 - mhawkesock.recv_into
不是协程,所以很可能会失败 :( - SeungHyun HwangBaseEventLoop
的子类,并提供一个 recv_into()
的协程实现。 - mhawke
AbstractEventLoop.sock_recv_into()
?可以包含一个代码示例。 - Tom Aranda