JSON编码非常长的迭代器

18

我正在编写一个Web服务,它返回包含非常长的列表的对象,已经编码成JSON格式。 当然,我们希望使用迭代器而不是Python列表,这样我们可以从数据库中流式传输对象; 不幸的是,标准库中的JSON编码器 (json.JSONEncoder) 仅接受将列表和元组转换为JSON列表 (虽然_iterencode_list 看起来似乎可以在任何可迭代对象上运行)。

文档字符串建议重写默认值以将对象转换为列表,但这意味着我们失去了流式传输的好处。 以前,我们曾覆盖一个私有方法,但(预计)当编码器重构时会出现问题。

在Python中以流式方式将迭代器序列化为JSON列表的最佳方法是什么?

4个回答

9

我正需要这个。第一种方法是覆盖JSONEncoder.iterencode()方法。然而,这不起作用,因为只要迭代器不是顶级的,一些_iterencode()函数的内部就会接管。

经过对代码的研究,我找到了一个非常巧妙的解决方案,但它只适用于Python 3,但我相信在Python 2中也可能有同样的魔法(只是其他魔术方法名称):

import collections.abc
import json
import itertools
import sys
import resource
import time
starttime = time.time()
lasttime = None


def log_memory():
    if "linux" in sys.platform.lower():
        to_MB = 1024
    else:
        to_MB = 1024 * 1024
    print("Memory: %.1f MB, time since start: %.1f sec%s" % (
        resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / to_MB,
        time.time() - starttime,
        "; since last call: %.1f sec" % (time.time() - lasttime) if lasttime
        else "",
    ))
    globals()["lasttime"] = time.time()


class IterEncoder(json.JSONEncoder):
    """
    JSON Encoder that encodes iterators as well.
    Write directly to file to use minimal memory
    """
    class FakeListIterator(list):
        def __init__(self, iterable):
            self.iterable = iter(iterable)
            try:
                self.firstitem = next(self.iterable)
                self.truthy = True
            except StopIteration:
                self.truthy = False

        def __iter__(self):
            if not self.truthy:
                return iter([])
            return itertools.chain([self.firstitem], self.iterable)

        def __len__(self):
            raise NotImplementedError("Fakelist has no length")

        def __getitem__(self, i):
            raise NotImplementedError("Fakelist has no getitem")

        def __setitem__(self, i):
            raise NotImplementedError("Fakelist has no setitem")

        def __bool__(self):
            return self.truthy

    def default(self, o):
        if isinstance(o, collections.abc.Iterable):
            return type(self).FakeListIterator(o)
        return super().default(o)

print(json.dumps((i for i in range(10)), cls=IterEncoder))
print(json.dumps((i for i in range(0)), cls=IterEncoder))
print(json.dumps({"a": (i for i in range(10))}, cls=IterEncoder))
print(json.dumps({"a": (i for i in range(0))}, cls=IterEncoder))


log_memory()
print("dumping 10M numbers as incrementally")
with open("/dev/null", "wt") as fp:
    json.dump(range(10000000), fp, cls=IterEncoder)
log_memory()
print("dumping 10M numbers built in encoder")
with open("/dev/null", "wt") as fp:
    json.dump(list(range(10000000)), fp)
log_memory()

结果:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[]
{"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}
{"a": []}
Memory: 8.4 MB, time since start: 0.0 sec
dumping 10M numbers as incrementally
Memory: 9.0 MB, time since start: 8.6 sec; since last call: 8.6 sec
dumping 10M numbers built in encoder
Memory: 395.5 MB, time since start: 17.1 sec; since last call: 8.5 sec

很明显,IterEncoder不需要存储10M个整数的内存,同时保持相同的编码速度。
这种"hacky"技巧在于_iterencode_list实际上并不需要list中的任何东西。它只是想知道list是否为空(__bool__),然后获取其迭代器。但是,只有当isinstance(x, (list, tuple))返回True时,它才能到达此代码。因此,我将迭代器打包成一个列表子类,然后禁用所有随机访问,预先获取第一个元素,以便我知道它是否为空,并反馈迭代器。然后,在迭代器的情况下,default方法会返回这个虚假的列表。

2
这是一个非常出色的解决方案,启发我编写了一个类似的更短的FakeListIterator版本,用于回答类似的问题。它不需要引发异常,包括__len____bool____repr__等未被覆盖的方法,一切都能无缝运行。 - hynekcer
顺便提一下,如果你只是要覆盖 JSONEncoderdefault 方法,那么你不需要对其进行子类化,只需定义一个函数并将其传递给 json.dumps(obj, default=...)default 关键字参数即可。 - cowbert
这个解决方案的价值超出我在此处所能描述的范畴!我正在处理数据集,并希望通过API将大小不同的数据从小到大传输到Databricks DBFS。对流进行JSON编码并进行add_block()调用有效地解决了我的每块最大1 MiB的问题。谢谢@Claude! - Jari Turkia

2

把这个代码保存到一个模块文件中并导入它,或者直接粘贴到你的代码中。

'''
Copied from Python 2.7.8 json.encoder lib, diff follows:
@@ -331,6 +331,8 @@
                     chunks = _iterencode(value, _current_indent_level)
                 for chunk in chunks:
                     yield chunk
+        if first:
+            yield buf
         if newline_indent is not None:
             _current_indent_level -= 1
             yield '\n' + (' ' * (_indent * _current_indent_level))
@@ -427,12 +429,12 @@
             yield str(o)
         elif isinstance(o, float):
             yield _floatstr(o)
-        elif isinstance(o, (list, tuple)):
-            for chunk in _iterencode_list(o, _current_indent_level):
-                yield chunk
         elif isinstance(o, dict):
             for chunk in _iterencode_dict(o, _current_indent_level):
                 yield chunk
+        elif hasattr(o, '__iter__'):
+            for chunk in _iterencode_list(o, _current_indent_level):
+                yield chunk
         else:
             if markers is not None:
                 markerid = id(o)
'''
from json import encoder

def _make_iterencode(markers, _default, _encoder, _indent, _floatstr,
        _key_separator, _item_separator, _sort_keys, _skipkeys, _one_shot,
        ## HACK: hand-optimized bytecode; turn globals into locals
        ValueError=ValueError,
        basestring=basestring,
        dict=dict,
        float=float,
        id=id,
        int=int,
        isinstance=isinstance,
        list=list,
        long=long,
        str=str,
        tuple=tuple,
    ):

    def _iterencode_list(lst, _current_indent_level):
        if not lst:
            yield '[]'
            return
        if markers is not None:
            markerid = id(lst)
            if markerid in markers:
                raise ValueError("Circular reference detected")
            markers[markerid] = lst
        buf = '['
        if _indent is not None:
            _current_indent_level += 1
            newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
            separator = _item_separator + newline_indent
            buf += newline_indent
        else:
            newline_indent = None
            separator = _item_separator
        first = True
        for value in lst:
            if first:
                first = False
            else:
                buf = separator
            if isinstance(value, basestring):
                yield buf + _encoder(value)
            elif value is None:
                yield buf + 'null'
            elif value is True:
                yield buf + 'true'
            elif value is False:
                yield buf + 'false'
            elif isinstance(value, (int, long)):
                yield buf + str(value)
            elif isinstance(value, float):
                yield buf + _floatstr(value)
            else:
                yield buf
                if isinstance(value, (list, tuple)):
                    chunks = _iterencode_list(value, _current_indent_level)
                elif isinstance(value, dict):
                    chunks = _iterencode_dict(value, _current_indent_level)
                else:
                    chunks = _iterencode(value, _current_indent_level)
                for chunk in chunks:
                    yield chunk
        if first:
            yield buf
        if newline_indent is not None:
            _current_indent_level -= 1
            yield '\n' + (' ' * (_indent * _current_indent_level))
        yield ']'
        if markers is not None:
            del markers[markerid]

    def _iterencode_dict(dct, _current_indent_level):
        if not dct:
            yield '{}'
            return
        if markers is not None:
            markerid = id(dct)
            if markerid in markers:
                raise ValueError("Circular reference detected")
            markers[markerid] = dct
        yield '{'
        if _indent is not None:
            _current_indent_level += 1
            newline_indent = '\n' + (' ' * (_indent * _current_indent_level))
            item_separator = _item_separator + newline_indent
            yield newline_indent
        else:
            newline_indent = None
            item_separator = _item_separator
        first = True
        if _sort_keys:
            items = sorted(dct.items(), key=lambda kv: kv[0])
        else:
            items = dct.iteritems()
        for key, value in items:
            if isinstance(key, basestring):
                pass
            # JavaScript is weakly typed for these, so it makes sense to
            # also allow them.  Many encoders seem to do something like this.
            elif isinstance(key, float):
                key = _floatstr(key)
            elif key is True:
                key = 'true'
            elif key is False:
                key = 'false'
            elif key is None:
                key = 'null'
            elif isinstance(key, (int, long)):
                key = str(key)
            elif _skipkeys:
                continue
            else:
                raise TypeError("key " + repr(key) + " is not a string")
            if first:
                first = False
            else:
                yield item_separator
            yield _encoder(key)
            yield _key_separator
            if isinstance(value, basestring):
                yield _encoder(value)
            elif value is None:
                yield 'null'
            elif value is True:
                yield 'true'
            elif value is False:
                yield 'false'
            elif isinstance(value, (int, long)):
                yield str(value)
            elif isinstance(value, float):
                yield _floatstr(value)
            else:
                if isinstance(value, (list, tuple)):
                    chunks = _iterencode_list(value, _current_indent_level)
                elif isinstance(value, dict):
                    chunks = _iterencode_dict(value, _current_indent_level)
                else:
                    chunks = _iterencode(value, _current_indent_level)
                for chunk in chunks:
                    yield chunk
        if newline_indent is not None:
            _current_indent_level -= 1
            yield '\n' + (' ' * (_indent * _current_indent_level))
        yield '}'
        if markers is not None:
            del markers[markerid]

    def _iterencode(o, _current_indent_level):
        if isinstance(o, basestring):
            yield _encoder(o)
        elif o is None:
            yield 'null'
        elif o is True:
            yield 'true'
        elif o is False:
            yield 'false'
        elif isinstance(o, (int, long)):
            yield str(o)
        elif isinstance(o, float):
            yield _floatstr(o)
        elif isinstance(o, dict):
            for chunk in _iterencode_dict(o, _current_indent_level):
                yield chunk
        elif hasattr(o, '__iter__'):
            for chunk in _iterencode_list(o, _current_indent_level):
                yield chunk
        else:
            if markers is not None:
                markerid = id(o)
                if markerid in markers:
                    raise ValueError("Circular reference detected")
                markers[markerid] = o
            o = _default(o)
            for chunk in _iterencode(o, _current_indent_level):
                yield chunk
            if markers is not None:
                del markers[markerid]

    return _iterencode

encoder._make_iterencode = _make_iterencode

-1

实时流媒体不被json很好地支持,因为这意味着客户端应用程序也必须支持流媒体。有一些Java库支持读取流式的json流,但它并不是很通用。还有一些Python绑定可供使用,如支持流媒体的C库yail

也许你可以使用Yaml代替jsonYamljson的超集。它在双方都有更好的流媒体支持,并且任何json消息仍然有效。

但在你的情况下,将对象流分割成单独的json消息流可能更简单。

还可以参考此讨论,哪些客户端库支持流媒体:是否有用于JSON的流API?


3
这个问题“是否有适用于JSON的流API?”是关于解析JSON而不是创建它的。 - Piotr Dobrogost
这只是不正确的说法:“这也意味着客户端应用程序必须支持流式传输。”在服务器端,您可能关心内存占用,而在客户端,您可以将完整对象保留在内存中。 - Vajk Hermecz
你如何将部分列表从服务器流式传输到不支持流式传输的客户端? - Hans Then
@HansThen 也许服务器暂时保存了文本JSON表示,但这并不意味着它必须还存储一个可迭代列表的实例。也许服务器甚至能够流式传输JSON表示(而不必将其全部保存在内存中),网络上的某些内容会处理客户端的缓冲。 - Ian Goldby

-2

并不是那么简单。大多数人使用的WSGI协议不支持流式传输。而支持它的服务器则违反了规范。

即使您使用不符合规范的服务器,那么您也必须使用类似于ijson的东西。 还要看看这个遇到与您相同问题的人http://www.enricozini.org/2011/tips/python-stream-json/

编辑:然后一切都取决于客户端,我想它将会用Javascript(?)编写。但我不知道如何从不完整的JSON块构建javascript(或任何语言)对象。我唯一能想到的是,在服务器端手动将长JSON分解为较小的JSON对象,然后逐个流式传输到客户端。但这需要Websockets而不是无状态的HTTP请求/响应。如果您所说的Web服务是REST API,那么我想这不是您想要的。


1
我的问题实际上与WSGI无关。ijson是一个解析器,而不是编码器。 - Max
PEP 3333(WSGI协议)中没有禁止流式响应。唯一的规定是,一旦开始向客户端写入数据,WSGI服务器就不能在内部缓冲或阻塞。大多数WSGI服务器只需在收到应用程序的字节流时不断调用fflush(3)或类似抽象函数来输出套接字,因此可以完全接受迭代生成器(序列化后立即将产生的对象发送到套接字)。 - cowbert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接