使用Websockets和asyncio同时监听多个socket

6

我正在尝试使用Websockets和asyncio在Python中创建一个脚本,以侦听多个套接字。问题是无论我做什么,它只会侦听我调用的第一个套接字。我认为这是由于无限循环造成的,解决此问题的选项是什么?为每个套接字使用线程?

  async def start_socket(self, event):
    payload = json.dumps(event)
    loop = asyncio.get_event_loop()

    self.tasks.append(loop.create_task(
        self.subscribe(event)))

    # this should not block the rest of the code
    await asyncio.gather(*tasks)


  def test(self):
    # I want to be able to add corotines at a different time
    self.start_socket(event1)
    # some code
    self.start_socket(event2)
3个回答

6

最终我做的是这样,这样就不会阻塞主线程,所有的订阅都可以并行工作。

def subscribe(self, payload):
    ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
    ws.connect(url)
    ws.send(payload)
    while True:
        result = ws.recv()
        print("Received '%s'" % result)

def start_thread(self, loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def start_socket(self, **kwargs):
    worker_loop = asyncio.new_event_loop()
    worker = Thread(target=self.start_thread, args=(worker_loop,))
    worker.start()

    worker_loop.call_soon_threadsafe(self.subscribe, payload)


def listen(self):
    self.start_socket(payload1)
    # code
    self.start_socket(payload2)
    # code
    self.start_socket(payload3)

2

您的代码似乎不完整,但是您展示的内容有两个问题。其中一个是run_until_complete只接受协程对象(或其他类型的future),而不是协程函数。因此应该是:

# note parentheses after your_async_function()
asyncio.get_event_loop().run_until_complete(your_async_function())

问题在于,无论我做什么,它只听取我调用的第一个套接字。 我认为这是无限循环,我有什么解决方法? 使用线程处理每个套接字?
无限循环不是问题,asyncio旨在支持此类“无限循环”。问题在于你正在尝试在一个协程中完成所有操作,而你应该为每个websocket创建一个协程。这不是问题,因为协程非常轻量级。
例如(未经测试):
async def subscribe_all(self, payload):
    loop = asyncio.get_event_loop()
    # create a task for each URL
    for url in url_list:
        tasks.append(loop.create_task(self.subscribe_one(url, payload)))
    # run all tasks in parallel
    await asyncio.gather(*tasks)

async def subsribe_one(self, url, payload):
    async with websockets.connect(url) as websocket:
        await websocket.send(payload)
        while True:
            msg = await websocket.recv()
            print(msg)

await asyncio.gather(*tasks) 给我一个 RuntimeWarning: coroutine 'subscribe_all' was never awaited (the async function isn't executed)。如果我使用 run_until_complete(gather(..)),它可以工作,但会阻塞线程并且之后的代码不会被执行。 - joseRo
@joseRo 没有其他代码很难确定发生了什么。你可能需要从另一个协程中等待subscribe_all... - user4815162342
@joseRo 谁调用了 test() 函数? - user4815162342
另一个文件,只需导入并调用它。 - joseRo
常规函数,事件循环在start_socket函数中启动(我附加的代码)。 - joseRo
显示剩余2条评论

2

有效地监听来自websocket服务器的多个连接的一种方法是保持已连接客户端列表,并在并行处理多个会话时进行操作。

例如,一个简单的服务器每隔几秒钟向每个连接的客户端发送随机数字:

import os
import asyncio
import websockets
import random 

websocket_clients = set()

async def handle_socket_connection(websocket, path):
    """Handles the whole lifecycle of each client's websocket connection."""
    websocket_clients.add(websocket)
    print(f'New connection from: {websocket.remote_address} ({len(websocket_clients)} total)')
    try:
        # This loop will keep listening on the socket until its closed. 
        async for raw_message in websocket:
            print(f'Got: [{raw_message}] from socket [{id(websocket)}]')
    except websockets.exceptions.ConnectionClosedError as cce:
        pass
    finally:
        print(f'Disconnected from socket [{id(websocket)}]...')
        websocket_clients.remove(websocket)

async def broadcast_random_number(loop):
    """Keeps sending a random # to each connected websocket client"""
    while True:
        for c in websocket_clients:
            num = str(random.randint(10, 99))
            print(f'Sending [{num}] to socket [{id(c)}]')
            await c.send(num)
        await asyncio.sleep(2)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        socket_server = websockets.serve(handle_socket_connection, 'localhost', 6789)
        print(f'Started socket server: {socket_server} ...')
        loop.run_until_complete(socket_server)
        loop.run_until_complete(broadcast_random_number(loop))
        loop.run_forever()
    finally:
        loop.close()
        print(f"Successfully shutdown [{loop}].")

一个简单的客户端与服务器连接并监听数字的程序:
import asyncio
import random
import websockets

async def handle_message():
    uri = "ws://localhost:6789"
    async with websockets.connect(uri) as websocket:
        msg = 'Please send me a number...'
        print(f'Sending [{msg}] to [{websocket}]')
        await websocket.send(msg)
        while True:
            got_back = await websocket.recv()
            print(f"Got: {got_back}")

asyncio.get_event_loop().run_until_complete(handle_message())

混淆线程和asyncio的麻烦大于其价值,而且你仍然会有代码在最浪费时间的步骤上阻塞(这是使用asyncio的基本好处)。您需要在事件循环中异步运行每个协程,使用await调用任何阻塞调用,并使用async定义与任何可等待交互的每个方法。请参见一个工作示例:https://github.com/adnantium/websocket_client_server

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接