使用Flask和WebSocket-for-Python (ws4py)为IPython Notebook创建WebSockets代理

7

ipython-notebook-proxy 启发,基于 ipydra 并扩展后者以支持更复杂的用户认证和代理,因为在我的用例中只能公开端口80。

我正在使用 flask-sockets 作为 gunicorn 工作进程,但我遇到了代理 WebSockets 的问题。IPython 使用三个不同的 WebSockets 连接,/shell/stdin/iopub,但我只能获得前两个的 101 Switching Protocols。而且 /stdin 在创建后立即收到一个 Connection Close Frame

以下是有关此代码的摘录:

# Flask imports...
from werkzeug import LocalProxy
from ws4py.client.geventclient import WebSocketClient

# I use my own LocalProxy because flask-sockets does not support Werkzeug Rules
websocket = LocalProxy(lambda: request.environ.get('wsgi.websocket', None))
websockets = {}

PROXY_DOMAIN = "127.0.0.1:8888"  # IPython host and port
methods = ["GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH",
           "CONNECT"]


@app.route('/', defaults={'url': ''}, methods=methods)
@app.route('/<path:url>', methods=methods)
def proxy(url):
    with app.test_request_context():
        if websocket:
            while True:
                data = websocket.receive()
                websocket_url = 'ws://{}/{}'.format(PROXY_DOMAIN, url)
                if websocket_url not in websockets:
                    client = WebSocketClient(websocket_url,
                                             protocols=['http-only', 'chat'])
                    websockets[websocket_url] = client
                else:
                    client = websockets[websocket_url]
                client.connect()
                if data:
                    client.send(data)
                client_data = client.receive()
                if client_data:
                    websocket.send(client_data)
            return Response()

我尝试创建自己的WebSocket代理类,但它也不起作用。
class WebSocketProxy(WebSocketClient):
    def __init__(self, to, *args, **kwargs):
        self.to = to
        print(("Proxy to", self.to))
        super(WebSocketProxy, self).__init__(*args, **kwargs)

    def opened(self):
        m = self.to.receive()
        print("<= %d %s" % (len(m), str(m)))
        self.send(m)

    def closed(self, code, reason):
        print(("Closed down", code, reason))

    def received_message(self, m):
        print("=> %d %s" % (len(m), str(m)))
        self.to.send(m)

常规的请求-响应循环非常有效,所以我删除了那段代码。如果您感兴趣,完整的代码托管在hidra中。

我使用以下命令运行服务器:

$ gunicorn -k flask_sockets.worker hidra:app

这方面有进展了吗?我原本也想建造类似的东西。 - Mikko Ohtamaa
不是很确定。我放弃了这个想法。我猜我们得等待IPython的官方开发。他们也改变了Websockets的实现,也许现在使用Python 3 asyncio是正确的方法。 - Javier de la Rosa
好的。我正在处理这个问题 :) 当我解决它时,您将是第一个被通知的人。 - Mikko Ohtamaa
1个回答

2
这是我的解决方案(大致如此)。它很简陋,但应该作为构建websocket代理的起点。完整代码在未发布的项目pyramid_notebook中可用
  • 这里使用ws4py和uWSGI而不是gunicorn。

  • 我们使用uWSGI的内部机制来接收下游websocket消息循环。在Python世界中,没有像WSGI一样的Websockets机制(至少目前还没有),但看起来每个Web服务器都实现了自己的机制。

  • 创建了一个自定义的ws4py ProxyConnection,可以将ws4py事件循环与uWSGI事件循环相结合。

  • 启动程序后,消息开始飞来飞去。

  • 这里使用Pyramid的request(基于WebOb),但这实际上不应该有什么影响,对于任何Python WSGI应用程序进行少量修改后代码应该都能正常运行。

  • 正如您所看到的,这并没有真正利用异步性,只是在没有从套接字接收到任何内容时使用sleep()。

代码如下:

"""UWSGI websocket proxy."""
from urllib.parse import urlparse, urlunparse
import logging
import time

import uwsgi
from ws4py import WS_VERSION
from ws4py.client import WebSocketBaseClient


#: HTTP headers we need to proxy to upstream websocket server when the Connect: upgrade is performed
CAPTURE_CONNECT_HEADERS = ["sec-websocket-extensions", "sec-websocket-key", "origin"]


logger = logging.getLogger(__name__)


class ProxyClient(WebSocketBaseClient):
    """Proxy between upstream WebSocket server and downstream UWSGI."""

    @property
    def handshake_headers(self):
        """
        List of headers appropriate for the upgrade
        handshake.
        """
        headers = [
            ('Host', self.host),
            ('Connection', 'Upgrade'),
            ('Upgrade', 'websocket'),
            ('Sec-WebSocket-Key', self.key.decode('utf-8')),
            # Origin is proxyed from the downstream server, don't set it twice
            # ('Origin', self.url),
            ('Sec-WebSocket-Version', str(max(WS_VERSION)))
            ]

        if self.protocols:
            headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols)))

        if self.extra_headers:
            headers.extend(self.extra_headers)

        logger.info("Handshake headers: %s", headers)
        return headers

    def received_message(self, m):
        """Push upstream messages to downstream."""

        # TODO: No support for binary messages
        m = str(m)
        logger.debug("Incoming upstream WS: %s", m)
        uwsgi.websocket_send(m)
        logger.debug("Send ok")

    def handshake_ok(self):
        """
        Called when the upgrade handshake has completed
        successfully.

        Starts the client's thread.
        """
        self.run()

    def terminate(self):
        raise RuntimeError("NO!")
        super(ProxyClient, self).terminate()

    def run(self):
        """Combine async uwsgi message loop with ws4py message loop.

        TODO: This could do some serious optimizations and behave asynchronously correct instead of just sleep().
        """

        self.sock.setblocking(False)
        try:
            while not self.terminated:
                logger.debug("Doing nothing")
                time.sleep(0.050)

                logger.debug("Asking for downstream msg")
                msg = uwsgi.websocket_recv_nb()
                if msg:
                    logger.debug("Incoming downstream WS: %s", msg)
                    self.send(msg)

                s = self.stream

                self.opened()

                logger.debug("Asking for upstream msg")
                try:
                    bytes = self.sock.recv(self.reading_buffer_size)
                    if bytes:
                        self.process(bytes)
                except BlockingIOError:
                    pass

        except Exception as e:
            logger.exception(e)
        finally:
            logger.info("Terminating WS proxy loop")
            self.terminate()


def serve_websocket(request, port):
    """Start UWSGI websocket loop and proxy."""
    env = request.environ

    # Send HTTP response 101 Switch Protocol downstream
    uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))

    # Map the websocket URL to the upstream localhost:4000x Notebook instance
    parts = urlparse(request.url)
    parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port))
    url = urlunparse(parts)

    # Proxy initial connection headers
    headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS]

    logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers)

    ws = ProxyClient(url, headers=headers)
    ws.connect()

    # Happens only if exceptions fly around
    return ""

1
尽管它没有使用我需要的相同库,但由于pyramid_notebook的出色工作,我将其标记为答案。谢谢! - Javier de la Rosa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接