Python:是否有用于分块输入流的库函数?

6

我想对输入流进行分块以进行批处理。给定一个输入列表或生成器,

它可以被分成固定大小的块,以便逐个块地处理它们。
x_in = [1, 2, 3, 4, 5, 6 ...]

我希望有一个函数可以返回输入的块。比如,如果 chunk_size=4,那么,

x_chunked = [[1, 2, 3, 4], [5, 6, ...], ...]

我经常需要做这件事情,想知道是否有比自己编写更标准的方法。在 itertools 中是否有我遗漏的内容?(可以使用 enumerategroupby 来解决问题,但感觉有些笨重。)如果有人想看实现方式,这里是:

def chunk_input_stream(input_stream, chunk_size):
    """partition a generator in a streaming fashion"""
    assert chunk_size >= 1
    accumulator = []
    for x in input_stream:
        accumulator.append(x)
        if len(accumulator) == chunk_size:
            yield accumulator
            accumulator = []
    if accumulator:
        yield accumulator

编辑

受kreativitea答案的启发,这里提供一个使用islice的解决方案,它直截了当且不需要后置过滤。

from itertools import islice

def chunk_input_stream(input_stream, chunk_size):
    while True:
        chunk = list(islice(input_stream, chunk_size))
        if chunk:
            yield chunk
        else:
            return

# test it with list(chunk_input_stream(iter([1, 2, 3, 4]), 3))
3个回答

6
< p >来自itertools的配方:

def grouper(n, iterable, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx
    args = [iter(iterable)] * n
    return izip_longest(fillvalue=fillvalue, *args)

1
你能否使用 itertools.repeat 替代 []*n - jpm
1
这将需要一些微调来适应OP的情况 - 如果我理解正确,OP的代码没有填充。 - DSM
1
@jpm 鉴于整个过程都将直接作为参数传递到“zip_longest()”调用中,我想生成器的开销会使它比列表乘法更慢。这种方法更简单,可能更快。 - Gareth Latty
如果你将其作为一个函数,那么人们可以阅读注释并查看函数名称。这是常见的做法,虽然不明显,但一旦掌握了概念,它就清晰、简单和快速。 - Gareth Latty
1
@gatoatigrado,我并没有想要表现出高人一等的意思。说实话,我真的没有看到比这个解决方案更好的解决方案了。 - Gareth Latty
显示剩余8条评论

4
[感谢原帖作者更新:自从我升级版本以来,我一直在滥用“yield from”,甚至没有意识到我在这里不需要它。]
噢,管它呢:
from itertools import takewhile, islice, count

def chunk(stream, size):
    return takewhile(bool, (list(islice(stream, size)) for _ in count()))

给出:

>>> list(chunk((i for i in range(3)), 3))
[[0, 1, 2]]
>>> list(chunk((i for i in range(6)), 3))
[[0, 1, 2], [3, 4, 5]]
>>> list(chunk((i for i in range(8)), 3))
[[0, 1, 2], [3, 4, 5], [6, 7]]

警告:如果输入是列表,上述内容将遇到与OP的chunk_input_stream相同的问题。您可以通过额外的iter()包装来解决这个问题,但这样做不太美观。从概念上讲,使用repeat或cycle可能比count()更合理,但出于某种原因我在计算字符数。 :^)
[FTR:不,我仍然不完全认真对待这个问题,但嘿 - 这是星期一。]

如果你只是return takewhile...,在Python 2.x中就不需要使用yield from。进行这个编辑,我会将其标记为正确答案。另外,为了完整性,你可能需要包含导入行from itertools import takewhile, islice, count。你的解决方案简洁明了,实际上相当直接(请参见我的评论,了解为什么Jon的解决方案不是),并且有效 - 谢谢! - gatoatigrado

1

你为什么不使用类似这样的东西呢?

# data is your stream, n is your chunk length
[data[i:i+n] for i in xrange(0,len(data),n)]

编辑:

由于有人正在制作生成器……

def grouper(data, n):
    results = [data[i:i+n] for i in xrange(0,len(data),n)]
    for result in results:
        yield result
编辑2:

我在想,如果你将输入流作为双端队列存储在内存中,你可以非常高效地使用.popleft来产生n个对象。

from collections import deque
stream = deque(data)

def chunk(stream, n):
    """ Returns the next chunk from a data stream. """
    return [stream.popleft() for i in xrange(n)]

def chunks(stream, n, reps):
    """ If you want to yield more than one chunk. """
    for item in [chunk(stream, n) for i in xrange(reps)]:
        yield item

流属性在处理大量数据时非常重要。另一方面,islice很好用... - gatoatigrado
很好。使用islice时,您不必断言负值,因为异常是内置的。 - kreativitea

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接