Django 3.1: 使用异步生成器实现 StreamingHttpResponse

18

Django 3.1 文档 对于异步视图的描述如下:

主要好处在于能够不使用 Python 线程服务数百个连接。这允许您使用缓慢的流式传输、长轮询和其他令人兴奋的响应类型。

我认为“缓慢的流式传输”意味着我们可以实现一个 SSE 视图,而不会为每个客户端占用一个线程,因此我尝试绘制了一个简单的视图,如下所示:

async def stream(request):

    async def event_stream():
        while True:
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
            await asyncio.sleep(1)

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

(注意: 我修改了代码,参考了这个回答)

不幸的是,当该视图被调用时,会引发以下异常:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
    raise exc_info[1]
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
    response = await get_response(request)
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
    response = await wrapped_callback(request, *callback_args, **callback_kwargs)
  File "./chat/views.py", line 144, in watch
    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
    self.streaming_content = streaming_content
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
    self._set_streaming_content(value)
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
    self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable

在我看来,这说明StreamingHttpResponse目前不支持异步生成器。

我尝试修改StreamingHttpResponse以使用async for,但是我并没有做太多工作。

您有任何想法我该怎么做吗?


我99%确定它不受支持,因为响应对象中没有任何可等待的内容。 - Tom Wojcik
@BenoitBlanchon,那你的实际目标是什么?是有一个页面可以生成分块响应(用于大型响应),还是具备在其他事件发生时异步发送SSE(服务器端事件)的能力?或者完全不同的东西?因为我从你的问题中看到了固定的“StreamingHttpResponse”,但你仍然没有接受该响应。 - wowkin2
我的目标是发送SSE,我将接受第一个有效的响应。 - Benoit Blanchon
5个回答

5

说实话,Django本身并不支持这个功能,但我有一个解决方案,可以使用Daphne(也被Django channels所使用)来实现。

我创建了自己的StreamingHttpResponse类,它能够从异步方法中获取数据流,并将其提供给Django同步部分。

import asyncio

# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio

from django.http.response import StreamingHttpResponse


class AsyncStreamingHttpResponse(StreamingHttpResponse):

    def __init__(self, streaming_content=(), *args, **kwargs):
        sync_streaming_content = self.get_sync_iterator(streaming_content)
        super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)

    @staticmethod
    async def convert_async_iterable(stream):
        """Accepts async_generator and async_iterator"""
        return iter([chunk async for chunk in stream])

    def get_sync_iterator(self, async_iterable):
        nest_asyncio.apply()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
        return result

此外,您需要使用Daphne运行Django服务器以正确支持服务器发送事件(SSE)。它受"Django Software Foundation"的官方支持,并具有类似于gunicorn的语法,但是使用asgi.py而不是wsgi.py
使用它时,您可以使用以下命令进行安装:pip install daphne 将命令从:python manage.py runserver更改为类似于:daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application
不确定它是否与gunicorn兼容。
如果您有任何问题,请告诉我。

1
在代码中,这只是一个命名(来自作者的问题的想法),可以更改。但我相信它与“服务器发送事件”有关。将类从“SSEResponse”重命名为“AsyncStreamingHttpResponse”,以使其他人更清楚。 - wowkin2
我刚试过了 daphne,但好像不起作用。 curl -N 127.0.0.1:8000 没有显示任何东西,而且 daphne 进程被卡住了,甚至不能响应 Ctrl-C - Benoit Blanchon
1
@BenoitBlanchon 这是我的完整代码,存放在这个仓库中:https://github.com/wowkin2/django-test-streaming-response - wowkin2
@BenoitBlanchon回答了所有问题。 - wowkin2
2
我认为我的表述已经很清楚了,请看这个页面的顶部:“我将授予第一个找到在Django 3.1+中使用异步视图进行SSE的正确方法的人200声望值。” - Benoit Blanchon
显示剩余9条评论

2

这是一个老问题,但因为我正在寻找解决方案,所以它出现在了谷歌搜索结果中。最终我发现了这个仓库https://github.com/valberg/django-sse - 它使用Django 4.2中的异步视图通过SSE进行流式传输(具体请参见此处)。

我知道这是Django的最新补充,所以我希望它能帮助到其他人寻找答案。


3
事实上,之前在 Django 3.2 上无法工作的代码,现在神奇地在 4.2 上可以工作了。Valberg 在他的存储库中更进一步地使用 PostgreSQL 实现了通知,这非常棒。非常感谢分享! - Benoit Blanchon

0

我创建了一个名为stream的装饰器,可以与协程函数一起使用,使其与Django的StreamingHttpResponse兼容。以下是一个示例:

import asyncio
import functools

from django.http import StreamingHttpResponse


def stream(coroutine_function):
    @functools.wraps(coroutine_function)
    def wrapper(*args, **kwargs):
        coroutine = coroutine_function(*args, **kwargs)
        try:
            while True:
                yield asyncio.run(coroutine.__anext__())
        except StopAsyncIteration:
            pass
    return wrapper


@stream
async def chunks():
    for char in 'Hello, world!':
        yield char
        await asyncio.sleep(1)


async def index(request):
    return StreamingHttpResponse(chunks())

我还需要在settings.py文件的顶部添加nest_asyncio并调用apply(),如下所示:

import nest_asyncio
nest_asyncio.apply()

nest_asyncio 依赖支持从 stream 装饰器创建的 wrapper 函数中调用 asyncio.run

最后,Django 的 asgi 可以通过 gunicorn 运行 uvicorn,如下所示:

$ gunicorn -k uvicorn.workers.UvicornWorker www.asgi:application

我在使用uvloop运行时遇到了问题。 - Patrik Beck

0

进行 SSE 的另一种方式是使用特殊库django-eventstream

将以下内容添加到将消费数据的 HTML 页面中:

<script src="{% static 'django_eventstream/eventsource.min.js' %}"></script>
<script src="{% static 'django_eventstream/reconnecting-eventsource.js' %}"></script>

var es = new ReconnectingEventSource('/events/');

es.addEventListener('message', function (e) {
    console.log(e.data);
}, false);

es.addEventListener('stream-reset', function (e) {
    // ... client fell behind, reinitialize ...
}, false);

对于后端,你需要正确设置Django, 然后你就可以在任何需要进行服务器端事件(SSE)的视图/任务/信号/方法中调用以下方法:

添加以下视图以生成数据(事件):

# from django_eventstream import send_event

send_event('test', 'message', {'text': 'hello world'})

django-eventstream 依赖于 Django Channels,并且不利用 Django 3.1 的新异步视图。 - Benoit Blanchon
@BenoitBlanchon同意,无论如何你都可以从任何异步视图中调用它。或者你可以使用纯Django Channels来利用它。 - wowkin2

-1

看起来你需要使用类似于django-channel的东西:

Channels扩展了Django,为您的代码提供WebSocket、长轮询HTTP、任务卸载和其他异步支持,使用熟悉的Django设计模式和灵活的底层框架,不仅可以自定义行为,还可以编写支持自己的协议和需求。


谢谢您的回复,@Adrien。但是这并没有回答我的问题。我知道Django channels,但我想知道现在是否可以避免使用它,因为Django 3.1支持异步视图。 - Benoit Blanchon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接