Python: 将列表生成器转换为可序列化的JSON格式。

26

我该如何将一组JSON文件合并成一个巨大的JSON数组?我有5000个文件和550,000个列表项。

我的第一次尝试是使用 jq ,但看起来jq -s并不适用于大量输入。

jq -s -r '[.[][]]' *.js 
这个命令可以工作,但需要太长的时间才能完成,我真的想用Python来解决这个问题。
这是我的当前代码:
def concatFiles(outName, inFileNames):
    def listGenerator():
        for inName in inFileNames:
            with open(inName, 'r') as f:
                for item in json.load(f):
                    yield item

    with open(outName, 'w') as f:
        json.dump(listGenerator(), f)

我得到:

TypeError: <generator object listGenerator at 0x7f94dc2eb3c0> is not JSON serializable

尝试将所有文件加载到RAM中的任何操作都会触发Linux的OOM-killer。你有什么想法吗?


1
把文档文本串联起来,用逗号隔开怎么样? - bereal
你需要移除每个文件的外部数组。移除每个文件的第一个和最后一个字符应该可以,但我想控制(并移除)JSON缩进。 - Sebastian Wagner
实际上文件有多大?是否可能保持完整序列化数据比您的内存更大? - Alexander Oh
是的,这就是为什么调用list(..)不起作用的原因。 - Sebastian Wagner
您是否也需要在处理JSON之前对其进行验证?如果不需要,则无需将字符串-> JSON->字符串进行转换。只需在每个文件流之间放置逗号并用 [] 包围即可。 - Joel Cornett
5个回答

36

从simplejson 3.8.0版本开始,您可以使用iterable_as_array选项将任何可迭代对象序列化为数组。

# Since simplejson is backwards compatible, you should feel free to import
# it as `json`
import simplejson as json
json.dumps((i*i for i in range(10)), iterable_as_array=True)

结果是 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


24

你应该从list继承并覆盖__iter__方法。

import json

def gen():
    yield 20
    yield 30
    yield 40

class StreamArray(list):
    def __iter__(self):
        return gen()

    # according to the comment below
    def __len__(self):
        return 1

a = [1,2,3]
b = StreamArray()

print(json.dumps([1,a,b]))

结果是 [1, [1, 2, 3], [20, 30, 40]]


5
在Python 2.7.8版本中,StreamArray类还需要重写__len__方法并返回大于0的值(例如为1)。否则,JSON编码器甚至不会调用__iter__方法。 - Tristan
请注意,当使用indent参数且可迭代对象为空时,此解决方案会创建无效的JSON。json.dumps({"products": StreamArray()}, indent=2) # {"products": ]} - Mišo
1
我认为如果可迭代对象为空,我们不应该对长度使用“return 1”。 - Vadim Pushtaev

13
如果一个结果字符串不容易放入内存,但仍然可以从JSON迭代器轻松地写入流中,那么这个通用解决方案对于非常大的数据也非常有用。 (这比“import simplejson…”更好,它可以提供帮助,但不太多)。已在Python 2.7、3.0、3.3、3.6、3.10.0a7中进行了测试。比simplejson快两倍。内存占用小。编写单元测试。
import itertools

class SerializableGenerator(list):
    """Generator that is serializable by JSON"""

    def __init__(self, iterable):
        tmp_body = iter(iterable)
        try:
            self._head = iter([next(tmp_body)])
            self.append(tmp_body)
        except StopIteration:
            self._head = []

    def __iter__(self):
        return itertools.chain(self._head, *self[:1])

普通用法(输入内存占用小,但整个输出字符串仍在内存中):

>>> json.dumps(SerializableGenerator(iter([1, 2])))
"[1, 2]"
>>> json.dumps(SerializableGenerator(iter([])))
"[]"

对于非常庞大的数据,可以将其用作Python 3中JSON块的生成器,并且仍然使用非常少的内存:

>>> iter_json = json.JSONEncoder().iterencode(SerializableGenerator(iter(range(1000000))))
>>> for chunk in iter_json:
...     stream.write(chunk)
# or a naive examle
>>> tuple(iter_json)
('[1', ', 2', ... ', 1000000', ']')

这个类通常被普通的 JSONEncoder().encode(...) 内部使用,也可以被显式地调用 json.dumps(...)JSONEncoder().iterencode(...) 来获取 JSON 块的生成器。

(示例中的 iter() 函数并不是必需的,只是为了演示一个长度未知的非平凡输入。)


测试:

import unittest
import json
# from ?your_module? import SerializableGenerator 


class Test(unittest.TestCase):

    def combined_dump_assert(self, iterable, expect):
        self.assertEqual(json.dumps(SerializableGenerator(iter(iterable))), expect)

    def combined_iterencode_assert(self, iterable, expect):
        encoder = json.JSONEncoder().iterencode
        self.assertEqual(tuple(encoder(SerializableGenerator(iter(iterable)))), expect)

    def test_dump_data(self):
        self.combined_dump_assert(iter([1, "a"]), '[1, "a"]')

    def test_dump_empty(self):
        self.combined_dump_assert(iter([]), '[]')

    def test_iterencode_data(self):
        self.combined_iterencode_assert(iter([1, "a"]), ('[1', ', "a"', ']'))

    def test_iterencode_empty(self):
        self.combined_iterencode_assert(iter([]), ('[]',))

    def test_that_all_data_are_consumed(self):
        gen = SerializableGenerator(iter([1, 2]))
        list(gen)
        self.assertEqual(list(gen), [])

这个解决方案受到三个较旧答案的启发: Vadim Pushtaev(空迭代器问题)、user1158559(过于复杂)和Claude(在另一个问题中,也很复杂)。

与这些解决方案的重要区别包括:

  • 重要方法__len____bool__等从有意义初始化的list类继承一致。
  • 输入的第一项立即由__init__评估(不是由许多其他方法懒惰触发),list类可以立即知道迭代器是否为空。非空的list包含具有生成器的一个项或列表为空,如果迭代器为空。
  • 对于空迭代器的正确长度实现对于JSONEncoder.iterencode(...)方法很重要。
  • 所有其他方法都给出有意义的输出,例如__repr__
   >>> SerializableGenerator((x for x in range(3)))
   [<generator object <genexpr> at 0x........>]

这种解决方案的优点是可以使用标准的JSON序列化器。如果需要支持嵌套生成器,则使用simplejson的解决方案可能是最好的选择,它还有类似的变体iterencode(...)

*.pyi桩文件用于强类型:

from typing import Any, Iterable, Iterator

class SerializableGenerator(list):
    def __init__(self, iterable: Iterable[Any]) -> None: ...
    def __iter__(self) -> Iterator: ...

3

根据被接受的答案,这是我最终采用的StreamArray。它包含两个错误:

  1. 暗示self.__tail__可能是不可变的
  2. len(StreamArray(some_gen)) 只能是0或1

.

class StreamArray(list):

    def __init__(self, gen):
        self.gen = gen

    def destructure(self):
        try:
            return self.__head__, self.__tail__, self.__len__
        except AttributeError:
            try:
                self.__head__ = self.gen.__next__()
                self.__tail__ = self.gen
                self.__len__ = 1 # A lie
            except StopIteration:
                self.__head__ = None
                self.__tail__ = []
                self.__len__ = 0
            return self.__head__, self.__tail__, self.__len__

    def rebuilt_gen(self):
        def rebuilt_gen_inner():
            head, tail, len_ = self.destructure()
            if len_ > 0:
                yield head
            for elem in tail:
                yield elem
        try:
            return self.__rebuilt_gen__
        except AttributeError:
            self.__rebuilt_gen__ = rebuilt_gen_inner()
            return self.__rebuilt_gen__

    def __iter__(self):
        return self.rebuilt_gen()

    def __next__(self):
        return self.rebuilt_gen()

    def __len__(self):
        return self.destructure()[2]

仅限单次使用!


+1:你的解决方案是可行的,但是太复杂了。我认为我实现了更简单的方法。如果你发现我的方法有什么缺点,请看一下我的代码。 - hynekcer
你的看起来很好!对于我的用例,惰性评估第一个项目是一项功能。回想起来,可能可以从itertools中获得一些简化。非常高兴知道这个可以正常工作。 - user1158559

2

我在使用mrjob时遇到了map-reduce任务错误。

正确处理迭代器后,问题得以解决。

如果mapper没有正确处理迭代器的yield,就会出现这个错误。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接