金字塔流响应体

12

我正试图从我的Pyramid应用程序中流式传输服务器发送的事件(Server-Sent Events),但我无法弄清楚如何从视图(streaming portion)中流式传输响应正文(response body)。这是我正在使用的测试视图(它完全没有实现SSE,只是为了解决流媒体部分):

@view_config(route_name='iter_test')
def iter_test(request):
    import time
    def test_iter():
        i = 0
        while True:
            i += 1
            if i == 5:
                raise StopIteration
            yield str(time.time())
            print time.time()
            time.sleep(1)

    return test_iter()

这会产生一个错误:ValueError:无法将视图可调用函数 pdiff.views.iter_test 的返回值转换为响应对象。返回值是<generator object test_iter at 0x3dc19b0>。

我尝试使用return Response(app_iter = test_iter())代替,至少不会出错,但它不会流式传输响应 - 它会等待生成器完成后才将响应返回给我的浏览器。

我知道可以每个请求仅返回单个事件,并允许客户端在每个事件后重新连接,但我更喜欢通过从单个请求中流式传输多个事件来保留服务器发送事件的实时特性,而无需重新连接延迟。如何在Pyramid中实现呢?

3个回答

10

我已经找到了问题所在。原来我的应用代码没问题,问题出在 Waitress 和 nginx 上:

  1. 默认的 Pyramid Web 服务器 Waitress 将所有输出缓存为 18000 字节的块(请参见这个问题以获取详细信息)。

  2. 问题的根源被 Nginx 隐藏起来,Nginx 是我放在 Pyramid 应用程序前面的 Web 服务器,它也会缓冲响应。

(1) 可以通过以下任一方式解决:

  • 在您的 .ini 文件中使用 send_bytes = 1 配置 Waitress。这样可以解决流媒体问题,但会使整个应用程序变得非常慢。正如 @Zitrax 提到的那样,您可以通过更高的值恢复一些速度,但任何高于 1 的值都会使消息陷入缓冲区。

  • 切换到 Gunicorn。我不知道 Gunicorn 是否只使用较小的缓冲区,或者它是否对 app_iter 表现更好,但它有效,并且保持我的应用程序快速。

(2) 可以通过配置 Nginx 禁用流媒体路由的缓冲来解决。

您需要在 Nginx 配置中设置 proxy_buffering off。此设置适用于通过 proxy_pass 托管的站点。如果您没有使用 proxy_pass,则可能需要不同的设置。

  • 您可以根据请求头信息动态地为每个响应启用/禁用缓冲,如有关此主题的问题所示(EventSource/Server-Sent Events 的好解决方案)

  • 你可以在nginx配置文件中的location块中进行配置。如果你使用的不是EventSource并且不需要接收特定的头信息,或者你正在使用EventSource但希望调试来自普通浏览器选项卡的响应,在这种情况下,这是一个好选择,因为你不能在请求中发送Accept头信息。


  • 我曾经遇到过同样的问题,确实将send_bytes设置为1可以解决。但是这会使一切变得非常缓慢(我之前加载页面只需要不到0.5秒,现在却需要超过10秒)。而且我目前无法切换到gunicorn,因为它只适用于Unix系统。一个折中的办法是将send_bytes设置为大于1且小于18000的值。在我的情况下,100效果还不错。 - Zitrax

    4

    我之前做了一些测试,尝试使用事件源/服务器推送事件。我刚刚测试了一下,它仍然与Pyramid 1.5a很好地配合。

    @view_config(route_name = 'events')
    def events(request):
        headers = [('Content-Type', 'text/event-stream'),
                   ('Cache-Control', 'no-cache')]
        response = Response(headerlist=headers)
        response.app_iter = message_generator()
        return response
    
    def message_generator():
        socket2 = context.socket(zmq.SUB)
        socket2.connect(SOCK)
        socket2.setsockopt(zmq.SUBSCRIBE, '')
        while True:
            msg = socket2.recv()
            yield "data: %s\n\n" % json.dumps({'message': msg})
    

    完整的示例在此处:https://github.com/antoineleclair/zmq-sse-chat。请查看https://github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py
    我不确定为什么我的代码能够正常工作而你的不能。可能是因为头文件的原因,或者每个消息后面有两个'\n'。顺便说一下,如果你正确地查看事件源规范,你需要在每个新事件前加上data:并使用\n\n作为事件分隔符。

    谢谢!知道我的方法肯定有效,让我意识到其他潜在的问题,最终带领我找到了解决方案。 - spiffytech
    这个不再起作用了。它不能返回字节,所以使用yield "data: %s\n\n" % json.dumps({'message': msg}).encode() - Pierre Thibault

    2

    如果您没有为视图指定任何渲染器,则必须返回一个响应对象。Pyramid响应对象有一个特殊的参数app_iter用于返回迭代器。因此,您应该以以下方式进行操作:

    import time
    from pyramid.response import Response
    
    
    @view_config(route_name='iter_test')
    def iter_test(request):
    
        def test_iter():
            for _ in range(5):
                yield str(time.time())
                print time.time()
                time.sleep(1)
    
        return Response(app_iter=test_iter())
    

    我还稍微修改了你的代码,使其更易读。

    更新

    我尝试使用 return Response(app_iter=test_iter()) ,至少不会出错,但它不会流式传输响应 - 它会等待生成器完成后再将响应返回给我的浏览器。

    我猜问题在于缓冲。尝试发送一个非常大的迭代器。


    在我的测试中,app_iter 没有被流式传输到浏览器。相反,webob 在返回结果之前运行迭代器以完成(相关的 webob 代码在这里:github)。为了使 EventSource 流正常工作,我的迭代器需要是无限的(即不能轻易地完成),并且结果需要在生成时立即传递给浏览器,而不是一次性传递。 - spiffytech
    @spiffytech 你提供的代码是“body” getter,它将“app_iter”折叠成单个字符串。但是WebOb并不这样做:请参阅call方法的代码。我没有检查Pyramid源代码,但我猜它也不会这样做。因此,如果您返回一个带有“app_iter”的Response对象,并且在您的代码中不访问“body”,那么WebOb将按原样返回“app_iter”。 - Dmitry Vakhrushev
    缓冲似乎不是问题——我尝试了一个更大的生成器(50k项),Pyramid仍然等到它完成才发送结果。无论如何,我需要不等待——Server-Sent事件流的重点是事件立即传递到浏览器,因此我不能等待缓冲区填满。此外,我链接到的webob代码部分实际上正在被调用——我通过在迭代器内设置断点并在迭代器运行时查看调用堆栈来获得该行号,使用上面的代码——没有访问body和适当的Response对象。 - spiffytech

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接