Python - 在单独的子进程或线程中运行Autobahn|Python asyncio websocket服务器

22

我有一个基于tkinter的GUI程序,运行在Python 3.4.1上。程序中有几个线程从各种URL获取JSON数据。我想添加WebSocket功能,以使程序能够作为服务器运行,并允许多个客户端通过WebSocket连接并交换其他JSON数据。

我尝试使用Autobahn|Python WebSocket服务器来进行异步操作。

我首先尝试在GUI程序下启动一个独立的线程来运行异步事件循环。然而,每次尝试都会给出“AssertionError: There is no current event loop in thread 'Thread-1'”。

然后,我尝试使用标准库multiprocessing包在另一个进程中运行异步事件循环。虽然没有出现任何异常,但WebSocket服务器也没有启动。

是否可能从另一个Python程序中的子进程中运行异步事件循环?

是否有办法将异步事件循环集成到当前的多线程/tkinter程序中?

更新:下面是我尝试进行初始测试的实际代码。

from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory
import asyncio
from multiprocessing import Process

class MyServerProtocol(WebSocketServerProtocol):

   def onConnect(self, request):
      print("Client connecting: {0}".format(request.peer))

   def onOpen(self):
      print("WebSocket connection open.")

   def onMessage(self, payload, isBinary):
      if isBinary:
         print("Binary message received: {0} bytes".format(len(payload)))

      else:
         print("Text message received: {0}".format(payload.decode('utf8')))

      ## echo back message verbatim
      self.sendMessage(payload, isBinary)

   def onClose(self, wasClean, code, reason):
      print("WebSocket connection closed: {0}".format(reason))

def start_server():
   factory = WebSocketServerFactory("ws://10.241.142.27:6900", debug = False)
   factory.protocol = MyServerProtocol
   loop = asyncio.get_event_loop()
   coro = loop.create_server(factory, '10.241.142.27', 6900)
   server = loop.run_until_complete(coro)
   loop.run_forever()
   server.close()
   loop.close()


websocket_server_process = Process(target = start_server)
websocket_server_process.start()

其中大部分内容来自于Autobahn|Python异步IO示例代码。如果我尝试将其作为进程运行,则不会发生任何事情,没有客户端能够连接它,如果我运行netstat -a,则没有使用端口6900。如果只在主程序中使用start_server(),则可以创建WebSocket服务器。

3个回答

29

首先,你会收到 AssertionError: There is no current event loop in thread 'Thread-1'. 错误,因为 asyncio 要求程序中的每个线程都有自己的事件循环,但它只会在主线程中自动创建一个事件循环。因此,如果你在主线程中调用 asyncio.get_event_loop 一次,它将自动创建一个循环对象并将其设置为默认值,但是如果你在子线程中再次调用它,则会出现该错误。相反,你需要在线程启动时显式地创建/设置事件循环:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

一旦完成此操作,您应该能够在该特定线程中使用get_event_loop()
可以在通过multiprocessing启动的子进程中启动asyncio事件循环。
import asyncio
from multiprocessing import Process 

@asyncio.coroutine
def coro():
    print("hi")

def worker():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coro())

if __name__ == "__main__":
    p = Process(target=worker)
    p.start()
    p.join()

输出:

hi

唯一需要注意的是,如果在父进程和子进程中都启动了一个事件循环,且你使用的是Unix平台(由于Python bug),则需要在子进程中显式地创建/设置新的事件循环。在Windows上或使用“spawn”multiprocessing上下文中应该可以正常工作。
我认为可以在Tkinter应用程序的后台线程(或进程)中启动一个asyncio事件循环,并使tkinterasyncio事件循环并行运行。只有当你尝试从后台线程/进程更新GUI时才会遇到问题。

我已经更新了原始帖子,并附上了我正在尝试在单独的进程中运行和使用的确切代码。由于某种原因,我使用的代码无法正常工作。此外,我会从一些后台线程更新GUI。如果我尝试运行tkinter和asyncio,可能会遇到什么问题? - user2662241
据我所知,Tkinter不支持从除主线程以外的任何地方更新GUI。尝试这样做可能无法正常工作或导致异常被抛出。 - dano
@user2662241 如果我运行您上面提供的完全相同的代码,最终会在端口6900上监听服务器。(这是使用Python 2.7/trollius而不是Python 3.4/asyncio,因为目前无法访问Python 3.4)唯一的区别是我使用的是本地主机而不是您列出的IP。 - dano
谢谢!我遇到了这个错误信息的问题。我忘记设置事件循环了。 - byxor

5
@dano 的回答可能是正确的,但在大多数情况下会创建一个不必要的新进程。
我通过谷歌发现了这个问题,因为我自己也遇到了同样的问题。我编写了一个应用程序,我希望websocket api不在主线程上运行,并且这导致了你的问题。
我通过阅读Python文档中有关事件循环的内容,并找到了asyncio.new_event_loop和asyncio.set_event_loop函数,解决了这个问题。
我没有使用AutoBahn,而是使用了pypi websockets库,以下是我的解决方案。
import websockets
import asyncio
import threading

class WebSocket(threading.Thread):    
    @asyncio.coroutine
    def handler(self, websocket, path):
        name = yield from websocket.recv()
        print("< {}".format(name))
        greeting = "Hello {}!".format(name)
        yield from websocket.send(greeting)
        print("> {}".format(greeting))

    def run(self):
        start_server = websockets.serve(self.handler, '127.0.0.1', 9091)
        eventloop = asyncio.new_event_loop()
        asyncio.set_event_loop(eventloop)
        eventloop.run_until_complete(start_server)
        eventloop.run_forever()

if __name__ == "__main__":
    ws = WebSocket()
    ws.start()

1
如何通过WebSocket“按需”发送消息,即不通过回调函数?我有一个游戏服务器,希望通过WebSocket与客户端通信,我想调用socket_server.sendMessage(“我的消息”),但只有在将其放入回调中时才能这样做... - bob
@andy,你可以在onConnect()中将每个新连接添加到存储在类变量中的列表中,然后使用类方法来迭代列表以向所有可用连接发送消息(这看起来有点不干净,但对我来说有效)。别忘了在onClose()中从列表中删除连接。 - hecke

2

有没有一种方法可以将asyncio事件循环集成到当前的多线程/tkinter程序中?

是的,您可以使用asyncio事件循环运行您的tkinter程序。以下是一个概念验证。

'''Proof of concept integrating asyncio and tk loops.

Terry Jan Reedy
Run with 'python -i' or from IDLE editor to keep tk window alive.
'''

import asyncio
import datetime as dt
import tkinter as tk

loop = asyncio.get_event_loop()
root = tk.Tk()

# Combine 2 event loop examples from BaseEventLoop doc.
# Add button to prove that gui remain responsive between time updates.
# Prints statements are only for testing.

def flipbg(widget, color):
    bg = widget['bg']
    print('click', bg, loop.time())
    widget['bg'] = color if bg == 'white' else 'white'

hello = tk.Label(root)
flipper = tk.Button(root, text='Change hello background', bg='yellow',
                    command=lambda: flipbg(hello, 'red'))
time = tk.Label(root)
hello.pack()
flipper.pack()
time.pack()

def hello_world(loop):
    hello['text'] = 'Hello World'
loop.call_soon(hello_world, loop)

def display_date(end_time, loop):
    print(dt.datetime.now())
    time['text'] = dt.datetime.now()
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

end_time = loop.time() + 10.1
loop.call_soon(display_date, end_time, loop)

# Replace root.mainloop with these 4 lines.
def tk_update():
    root.update()
    loop.call_soon(tk_update)  # or loop.call_later(delay, tk_update)
# Initialize loop before each run_forever or run_until_complete call    
tk_update() 
loop.run_forever()

我已经实验性地运行了带有这4行代码的IDLE,在语法高亮1000行代码时只会稍微变慢。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接