Flask:通过向客户端写入数据进行流传输?

4

我有一段现成的代码,可以将数据序列化到类似文件对象的数据结构中:

def some_serialization_function(file):
    file.write(...)

在Flask中,我希望能够直接将序列化数据发送到客户端,而不是先将其缓存到内存中。
我查看了werkzeug的ResponseStreamMixin,但我认为它无法在没有缓存的情况下工作:
class StreamResponse(flask.Response, werkzeug.wrappers.ResponseStreamMixin):
   pass

@app.route("/data")
def get_data():
   r = StreamResponse()
   some_serialization_function(r.stream) # everything is buffered into memory
   return r # buffered data is sent after return

我发现所有关于流数据的示例都是基于生成器的,这些生成器在相反的方向上工作(即数据是从生成器中“拉出来”的,而不是通过写入调用“推送”数据),所以我想知道,在 Flask 中是否有一种直接“写入”客户端的方法?
编辑-为了更清楚:我正在寻找一种方式来提供由“some_serialization_function(...)”生成的数据(我无法轻松更改),而不必将该函数写入所有数据到缓冲区/文件中的内存/IO开销。
(我怀疑最终会使用临时文件,因为与实际通过网络发送数据的开销相比,IO开销并不重要。此外,我的主要关注点是内存开销)。
2个回答

5

您可以创建一个类似文件的对象,用于传送生成器向客户端流式传输数据。下面是一个使用队列实现的快速、简单的示例代码:

from queue import Queue

class StreamWriter(object):
    def __init__(self):
        self.queue = Queue()

    def write(self, str):
        self.queue.put(str)

    def read(self):
        str = self.queue.get()
        self.queue.task_done()
        if str == '~':
            return None
        return str

    def close(self):
        self.write('~')  # indicate EOF

这只是一个发布-订阅类型的队列。 read() 方法将会阻塞,直到另一个线程写入数据。
现在你可以使用生成器来流式传输响应。下面的例子展示了一个以序列化函数作为参数的生成器。序列化函数在后台线程中执行,并接收文件对象作为参数。
def generate_response(serialize):
    file = StreamWriter()
    def serialize_task():
        serialize(file)
        file.close()
    threading.Thread(target=serialize_task).start()
    while True:
        chunk = file.read()
        if chunk is None:
            break
        yield chunk

我希望这能对你有所帮助!

这很有帮助,但我认为实际实现需要更多的参与(考虑从线程中传递错误/异常)。在我的使用情况下,务实的做法是使用临时文件。 - hmn
是的,这只是一个简单的测试,以展示如何完成。它不足以用于生产环境。 - Miguel Grinberg

0

如果我理解得没错,您想要:

  • 提供数据流的Flask web应用程序
  • 客户端逐个获取数据,而不是一次性获得整个数据块
  • 由Flask web应用程序控制,因此启动write应用程序。

我认为,这不能实现,因为必须有人控制流程,在Web应用程序的情况下,读取数据的是客户端。

另一方面,如果您想防止将整个内容缓冲到Web应用程序中供客户端使用,您可以逐个读取Web服务器上的数据,并逐部分产生它。

逐部分由服务器提供内容

from flask import Flask, Response, request
app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello World!'

@app.route('/loop')
def loop():
    def generate():
        yield "Hello"
        yield "World"
    return Response(generate())

@app.route('/longloop/<int:rows>')
def longloop(rows):
    def generate(rows):
        for i in xrange(rows):
            yield "{i}: Hello World".format(i=i)
    return Response(generate(rows))

if __name__ == '__main__':
    app.run(debug=True)

诀窍是使用生成器生成输出的响应(Response)对象。

如果您访问http://localhost:5000/longloop/100,您将收到100个问候。

请尝试使用curl从命令行运行此操作,并更好地将输出重定向到/dev/null

$ curl -X GET http://localhost:5000/longloop/120000000 > /dev/null                                                                                                                              
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  538M    0  538M    0     0  1056k      0 --:--:--  0:08:41 --:--:-- 1079k

我们可以看到,脚本现在运行了超过8分钟,flask应用程序消耗的内存仍然大致相同,在我的情况下,它保持在总RAM的0.4%左右。


“Streaming” 可能会产生歧义,我的意思是在不需要缓存的情况下提供由该函数创建的数据服务。 - hmn
@hmn 我在我的编辑答案中提供了Flask应用程序的示例。 - Jan Vlcinsky
抱歉,那对我没有帮助。我已经知道如何使用生成器流式响应了。我的问题是生成数据的函数(我不能轻易更改)工作方向相反。 - hmn
如果您的序列化函数一次性将所有数据写入某个文件,则必须在某处缓冲它,没有其他选择。您可以使用临时文件或StringIO缓冲区来进行缓冲。如果您想要防止这种中间缓冲,您真的必须将序列化函数更改为某个迭代器或生成器。 - Jan Vlcinsky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接