使用Python Tornado在另一个线程中发送WebSocket消息

6
我希望使用Python中的WebSockets将我从PySerial读取的数据推送给web客户端,以便让其实时更新。目前,我正在使用以下代码通过一个单独的线程来连续读取串行数据。
def read_from_port():
    while running:
        reading = ser.readline().decode()
        handle_data(reading)

thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()

我正在对串行数据进行一些处理,然后如果计算结果与其先前值不同,想要向所有连接的WebSocket客户端广播一条消息。为此,我设置了以下代码:

clients = []

def Broadcast(message):
    for client in clients:
        client.sendMessage(json.dumps(message).encode('utf8'))
        print("broadcasted")

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.instance().start()

我希望您能在工作线程中使用“broadcast”方法广播结果。但是在工作线程中使用该方法会产生以下错误。
File "main.py", line 18, in Broadcast
    client.write_message(message)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
    return self.stream.write(frame)
  File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
    future = Future()  # type: Future[None]
  File "/usr/local/Cellar/python@3.8/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

我理解的问题是Tornado的write_message函数不是线程安全的,因为我试图直接从工作线程调用该函数导致此错误。据我所知,使用Tornado进行并发编程的推荐方法是通过asyncio,但在这种情况下,我认为采用线程方法可能更加合适,因为我有一个基本上一直并行运行的循环。
然而,我对asyncio以及Python中线程的实现知之甚少,因此我想找出从不同线程发送WebSocket消息的最简单方法是什么。

2
只是一点提醒,将方法或函数名称以大写字母开头的方式命名是一个不好的想法,因为这样很难理解该名称是类还是函数。约定俗成的做法是对类使用大写字母命名,对函数使用小写字母命名。 - Tõnis M
Python中的多线程很好用。问题在于这些可怕的Web服务器软件包。是的,上面的代码会失败——有时iostream是从调用线程写入的,有时是从“事件循环”写入的。因此,在iostream中断言会失败。整个过程是不必要地复杂化了。 - personal_cloud
2个回答

3

阅读官方文档,了解如何同时使用asyncio和多线程在https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading给了我必要的提示,可以通过使用"call_soon_threadsafe"函数相当优雅地实现。因此,以下代码似乎可以完成任务。

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)

clients = []

def bcint(message):
    for client in clients:
        client.write_message(message)
        print("broadcasted")

def Broadcast(message):
    io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.current().start()

AttributeError: 'module'对象没有属性'asyncio_loop'。也许您的答案假定了Python 3?但该问题并未说明Python 3。 - personal_cloud
@personal_cloud Python2已于2020年停止更新。 - Jeppe
@Jeppe 是的。Python基金会正在放弃Python2、Fortran和其他各种语言。这很好。然而,Python3仍然不兼容基本的Python2功能,如字节串和打印语法。因此,所有用Python2编写的软件仍然是Python2。这些软件必须在Python2中进行维护和改进。就像Fortran、Modula-3等软件一样,无论语言被宣布为多么“死亡”,它们仍然使用其原始语言进行维护。 - personal_cloud
我的观点是,除非另有说明,否则应始终假定为python3。此答案和原始帖子中包含的代码均为python3。 - Jeppe

0
一个更清晰的选项是使用诸如pyzmq之类的队列,这将帮助您建立从一个线程到另一个线程的通信。
观察您的用例,您可以使用PUB / SUB模型。 这里是一个示例代码。 此外,您可以使用'inproc'而不是'tcp'。 这将减少延迟,因为您将在同一进程中在多个线程之间通信。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接