在asyncio协议/服务器之间进行通信

3
我正在尝试编写一个Server Side Events服务器,我可以通过telnet连接并将telnet内容推送到浏览器。使用Python和asyncio的想法是尽可能少地使用CPU,因为它将在树莓派上运行。
到目前为止,我已经有了以下代码,它使用了在此处找到的库:https://pypi.python.org/pypi/asyncio-sse/0.1,该库使用了asyncio。
我还复制了一个使用asyncio的telnet服务器。
两者分别工作,但我不知道如何将它们绑定在一起。据我所知,我需要从Telnet.data_received内部调用SSEHandler类中的send(),但我不知道如何访问它。这两个“服务器”都需要在循环中运行以接受新连接或推送数据。
有人能帮忙或指点吗?
import asyncio
import sse

# Get an instance of the asyncio event loop
loop = asyncio.get_event_loop()

# Setup SSE address and port
sse_host, sse_port = '192.168.2.25', 8888

class Telnet(asyncio.Protocol):
    def connection_made(self, transport):
        print("Connection received!");
        self.transport = transport

    def data_received(self, data):
        print(data)
        self.transport.write(b'echo:')
        self.transport.write(data)

        # This is where I want to send data via SSE
        # SSEHandler.send(data)

        # Things I've tried :(
        #loop.call_soon_threadsafe(SSEHandler.handle_request());
        #loop.call_soon_threadsafe(sse_server.send("PAH!"));

    def connection_lost(self, esc):
        print("Connection lost!")
        telnet_server.close()

class SSEHandler(sse.Handler):
    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')

# SSE server
sse_server = sse.serve(SSEHandler, sse_host, sse_port)

# Telnet server
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777))

#telnet_server.something = sse_server;

loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())
1个回答

7
服务器端事件是一种http协议;您可以在任何给定时刻拥有任意数量的并发http请求,如果没有人连接,则可以为零,或者可能有几十个。所有这些细节都包含在两个“sse.serve”和“sse.Handler”构造中;前者表示单个侦听端口,将每个单独的客户端请求分派到后者。
此外,“sse.Handler.handle_request()”针对每个客户端调用一次,并且一旦该协程终止,客户端就会断开连接。在您的代码中,该协程立即终止,因此客户端看到一个“正在工作”的事件。所以,我们需要等待,或多或少地永远等待。我们可以通过“yield from”一个“asyncio.Future()”来实现。
第二个问题是我们需要以某种方式获取所有单独的SSEHandler()实例,并在每个实例上使用send()方法。好的,我们可以让每个实例在它们的handle_request()方法中自我注册;通过将每个实例添加到将各个处理程序实例映射到它们正在等待的future的字典中。
class SSEHandler(sse.Handler):
    _instances = {}

    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')
        my_future = asyncio.Future()
        SSEHandler._instances[self] = my_future
        yield from my_future

现在,要向所有正在监听的对象发送事件,我们只需访问我们创建的字典中注册的所有SSEHandler实例,并在每个实例上使用send()

class SSEHandler(sse.Handler):

    #...

    @classmethod
    def broadcast(cls, message):
        for instance, future in cls._instances.items():
            instance.send(message)

class Telnet(asyncio.Protocol):

    #...

    def data_received(self, data):
        #...
        SSEHandler.broadcast(data.decode('ascii'))

最后,当telnet连接关闭时,您的代码将退出。虽然这很好,但我们也应该在此时清理内容。幸运的是,这只是为所有处理程序的所有future设置结果的问题。

class SSEHandler(sse.Handler):

    #...

    @classmethod
    def abort(cls):
        for instance, future in cls._instances.items():
            future.set_result(None)
        cls._instances = {}

class Telnet(asyncio.Protocol):

    #...

    def connection_lost(self, esc):
        print("Connection lost!")
        SSEHandler.abort()
        telnet_server.close()

以下是完整的可用转储,以防我的示例不够清晰。
import asyncio
import sse

loop = asyncio.get_event_loop()
sse_host, sse_port = '0.0.0.0', 8888

class Telnet(asyncio.Protocol):
    def connection_made(self, transport):
        print("Connection received!");
        self.transport = transport

    def data_received(self, data):
        SSEHandler.broadcast(data.decode('ascii'))

    def connection_lost(self, esc):
        print("Connection lost!")
        SSEHandler.abort()
        telnet_server.close()

class SSEHandler(sse.Handler):
    _instances = {}
    @classmethod
    def broadcast(cls, message):
        for instance, future in cls._instances.items():
            instance.send(message)

    @classmethod
    def abort(cls):
        for instance, future in cls._instances.items():
            future.set_result(None)
        cls._instances = {}

    @asyncio.coroutine
    def handle_request(self):
        self.send('Working')
        my_future = asyncio.Future()
        SSEHandler._instances[self] = my_future
        yield from my_future

sse_server = sse.serve(SSEHandler, sse_host, sse_port)
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777))
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())

非常感谢您解释了如何以及为什么,非常有用。看来我还有很多要学习的。我已经运行了您的示例代码,它正好做我想要/需要的事情,并且CPU使用率非常低。再次感谢。 - Steve Parker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接