在Python中如何以(n)个元素为一组迭代遍历一个迭代器?

156

你能想到一种好的方式(也许使用itertools)将迭代器分成给定大小的块吗?

因此,l=[1,2,3,4,5,6,7],使用chunks(l,3)变为迭代器[1,2,3], [4,5,6], [7]

我可以想出一个小程序来实现,但不知道是否有更好的方法,也许可以使用itertools。


3
这接近正确,但由于对最后一块的处理不同,所以并非完全相同。 - Sven Marnach
5
这有些不同,因为那个问题涉及列表,而这个问题则更一般化,涉及迭代器。虽然答案似乎最终是相同的。 - recursive
@recursive:是的,在完全阅读了链接的帖子后,我发现我回答中的所有内容都已经在其他帖子中出现过了。 - Sven Marnach
https://dev59.com/lnVC5IYBdhLWcg3wYQAp#312464 - johnson
1
由于其中一个链接的问题特别涉及列表,而不是一般的可迭代对象,因此使用VTR。 - wjandrea
这个回答解决了你的问题吗?Python生成器,将另一个可迭代对象分组为N组 - Tomerikoo
16个回答

182
< p >来自itertools文档的recipesgrouper()方案接近您想要的内容:

def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    if incomplete == 'ignore':
        return zip(*args)
    else:
        raise ValueError('Expected fill, strict, or ignore')

这种方法在最后一块不完整的情况下可能效果不佳,具体取决于“不完整”模式,它可能会用填充值填充最后一块,引发异常或默默丢弃不完整的块。
在最新版本的配方中,他们添加了“批处理”配方,可以完全满足您的需求。
def batched(iterable, n):
    "Batch data into tuples of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

最后,一个更具体的解决方案,只适用于序列,但可以按预期处理最后一块,并保留原始序列的类型:
(my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size))

6
不行,因为这会导致无限循环。 - Sven Marnach
14
我很惊讶这个答案得到了如此多的投票。这个配方对于小规模的n非常有效,但是对于大型群体来说,效率非常低下。例如我的n是20万。创建一个临时列表有20万项......不太理想。 - Jonathan Eunice
7
几乎所有情况下,这就是人们想要的(这也是为什么它被包含在Python文档中的原因)。针对特定情况进行优化超出了此问题的范围,即使您在评论中包含了信息,我也无法确定最佳方法。如果您想分块一个适合内存的数字列表,您可能最好使用NumPy的.resize()方法。如果您想分块普通迭代器,第二种方法已经非常好了--它创建了大小为200K的临时元组,但这并不是大问题。 - Sven Marnach
6
@SvenMarnach 我们的意见不同。我认为人们需要的是方便,而不是不必要的额外负担。文档提供了过于臃肿的答案,导致出现了额外的负担。对于大量数据,临时元组/列表等超过200K或1M项的内容会使程序消耗大量多余的内存并且运行时间更长。如果没有必要,为什么要这么做呢?当元素数量为200K时,多余的临时储存会导致整个程序的运行时间比移除该部分代码之前多3.5倍,这是一个相当大的问题。由于迭代器是一个数据库游标而不是数字列表,因此NumPy无法使用。 - Jonathan Eunice
2
izip_longest在Python 3中已更名为zip_longest。 - hojin
显示剩余8条评论

94

尽管 OP 要求函数以列表或元组的形式返回块,但如果您需要返回迭代器,则可以修改Sven Marnach的解决方案:

def batched_it(iterable, n):
    "Batch data into iterators of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while True:
        chunk_it = itertools.islice(it, n)
        try:
            first_el = next(chunk_it)
        except StopIteration:
            return
        yield itertools.chain((first_el,), chunk_it)

一些基准测试:http://pastebin.com/YkKFvm8b

只有当您的函数遍历每个块中的元素时,它才会略微更有效率。


24
在找到上方被接受并得到最高票的答案后,我几乎完全采用了这种设计。但我发现文档中的答案效率非常低下,尤其是当你一次性地对成千上万或数百万个对象进行分组时--这正是你最需要分割的时候--它必须非常高效。这就是正确的答案。 - Jonathan Eunice
这是最佳解决方案。 - Lawrence
5
如果调用者没有用尽chunk_it(例如,通过提前跳出内部循环),那么这样做会出现错误吗? - Tavian Barnes
4
有点晚了:这个优秀的答案可以通过用for循环替换while循环来缩短一点:for x in it: yield chain((x,), islice(it, n)),对吧? - Claas
1
@Claas:好的,你需要 islice(it, n - 1)(或者为了性能,你需要在一开始就减少 n,并验证它是否仍然 >=0)来正确计算数量,但是是的,这将是一个稍微快一点的解决方案(因为它会将每个项目的工作量略微推向 C 层)。 - ShadowRanger
显示剩余4条评论

24

Python 3.12增加了itertools.batched,它可以处理所有可迭代对象(包括列表):

>>> from itertools import batched
>>> for batch in batched('ABCDEFG', 3):
...     print(batch)
('A', 'B', 'C')
('D', 'E', 'F')
('G',)

22

自 Python 3.8 版本以来,有一个更简单的解决方案,使用 := 运算符:

def grouper(iterator: Iterator, n: int) -> Iterator[list]:
    while chunk := list(itertools.islice(iterator, n)):
        yield chunk

然后以那种方式调用它:

>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]

注意:您可以在grouper函数中放置iter以使用可迭代对象而不是迭代器


1
如果你和我一样找不到 Iterator 类型,可以使用 from collections.abc import Iterator 进行导入。 - Md Mazedul Islam Khan
4
请注意,在grouper中直接传递列表(例如OP示例中的l)会导致无限循环。请改为传递iter(l),或相应地修改函数。 - Nicolai Weitkemper

17

这适用于任何可迭代对象。它返回生成器的生成器(以完全灵活性为基础)。我现在意识到它基本上与 @reclosedevs 的解决方案相同,但没有废话。不需要 try...except,因为 StopIteration 会传播上来,这正是我们想要的。

调用 next(iterable) 来提高 iterable 为空时的 StopIteration,因为如果你让它这样做,islice 将会无限产生空生成器。

这个方法更好,因为它只有两行代码,但易于理解。

def grouper(iterable, n):
    while True:
        yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
请注意,next(iterable)被放入一个元组中。否则,如果next(iterable)本身是可迭代对象,则itertools.chain会将其展开。感谢Jeremy Brown指出这个问题。

注意:next(iterable)会被放进一个元组中。否则,如果next(iterable)本身也是可迭代对象的话,就会被itertools.chain展开。感谢Jeremy Brown指出这个问题。


3
尽管这可能回答了问题,但加入一些解释和描述可能有助于理解你的方法,并为我们说明为什么你的答案独具特色。 - deW1
2
iterable.next() 需要被包含或者通过迭代器来产生,以便链式操作能够正常工作 - 例如,yield itertools.chain([iterable.next()], itertools.islice(iterable, n-1))。 - Jeremy Brown
3
使用next(iterable)而不是iterable.next() - Antti Haapala -- Слава Україні
4
在while循环前加上一行代码iterable = iter(iterable)可能是有意义的,它可以将你的iterable转换为一个iterator。需要注意的是,可迭代对象(Iterables)没有__next__方法 - Mateen Ulhaq
3
自 PEP479 起,生成器函数中引发 StopIteration 已被弃用。因此,我更喜欢使用 @reclesedev 的解决方案中的显式返回语句。 - loutre
显示剩余4条评论

8

今天我在做一些工作,想出了一个我认为是简单的解决方案。它类似于jsbueno的答案,但我认为当iterable的长度可被n整除时,他的答案会产生空的group。我的答案在iterable用尽时进行简单检查。

def chunk(iterable, chunk_size):
    """Generates lists of `chunk_size` elements from `iterable`.
    
    
    >>> list(chunk((2, 3, 5, 7), 3))
    [[2, 3, 5], [7]]
    >>> list(chunk((2, 3, 5, 7), 2))
    [[2, 3], [5, 7]]
    """
    iterable = iter(iterable)
    while True:
        chunk = []
        try:
            for _ in range(chunk_size):
                chunk.append(next(iterable))
            yield chunk
        except StopIteration:
            if chunk:
                yield chunk
            break

1
对于Python3,您需要将iterable.next()更改为next(iterable) - rrauenza

3
这里有一个返回lazy chunks的方法;如果您需要列表,请使用map(list, chunks(...))
from itertools import islice, chain
from collections import deque

def chunks(items, n):
    items = iter(items)
    for first in items:
        chunk = chain((first,), islice(items, n-1))
        yield chunk
        deque(chunk, 0)

if __name__ == "__main__":
    for chunk in map(list, chunks(range(10), 3)):
        print chunk

    for i, chunk in enumerate(chunks(range(10), 3)):
        if i % 2 == 1:
            print "chunk #%d: %s" % (i, list(chunk))
        else:
            print "skipping #%d" % i

介意对这个如何工作发表评论。 - Marcin
3
注意:此生成器产生的可迭代对象只在请求下一个可迭代对象之前有效。例如,当使用list(chunks(range(10), 3))时,所有的可迭代对象都已经被消耗完了。 - Sven Marnach

3
一种简洁的实现方式如下所示:
chunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))

这是因为[iter(iterable)]*n是一个包含相同迭代器的列表,zipping将从列表中的每个迭代器中取出一个项目,这些迭代器是相同的,结果每个zip元素包含一组n项。 izip_longest需要完全消耗底层可迭代对象,而不是在到达第一个用尽的迭代器时停止迭代,这会截断iterable的任何剩余部分。这导致需要过滤填充值。因此,稍微更健壮的实现如下:
def chunker(iterable, n):
    class Filler(object): pass
    return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))

这样可以确保填充值不会是基础可迭代对象中的任何一项。按照上面的定义使用:
iterable = range(1,11)

map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]

map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]

这个实现基本可以满足你的需求,但是它存在一些问题:
def chunks(it, step):
  start = 0
  while True:
    end = start+step
    yield islice(it, start, end)
    start = end

区别在于,由于islice在调用超出it结尾时没有引发StopIteration或其他任何异常,因此它会一直生成;还有一个稍微棘手的问题是必须先消耗islice结果,然后才能迭代该生成器。

要功能性地生成移动窗口:

izip(count(0, step), count(step, step))

所以这就变成了:
(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))

但是,这仍然会创建一个无限迭代器。因此,需要使用takewhile(或者可能有更好的选择)来限制它:

chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))

g = chunk(range(1,11), 3)

tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])


@SvenMarnach:我已根据您的一些观点编辑了代码和文本。非常需要校对。 - Marcin
1
太快了。 :) 我仍然有一个问题与第一个代码片段:它只在yield的切片被消耗时才有效。如果用户不立即消耗它们,可能会发生奇怪的事情。这就是为什么Peter Otten使用deque(chunk, 0)来消耗它们,但这个解决方案也有问题--请参见我对他的答案的评论。 - Sven Marnach
1
我喜欢chunker()的最新版本。另外,创建一个唯一的标记的好方法是sentinel = object() - 它保证与任何其他对象都不同。 - Sven Marnach
我已经颠倒了我的答案顺序,所以请仔细阅读@SvenMarnach的评论。 - Marcin
@SvenMarnach:哇,哨兵的建议很好 - 我没想到过。 - Marcin
显示剩余2条评论

2
“简单胜于复杂” - 一个几行代码的直接生成器就可以完成任务。只需将其放置在某些实用程序模块中即可:
def grouper (iterable, n):
    iterable = iter(iterable)
    count = 0
    group = []
    while True:
        try:
            group.append(next(iterable))
            count += 1
            if count % n == 0:
                yield group
                group = []
        except StopIteration:
            yield group
            break

2

代码高尔夫版:

def grouper(iterable, n):
    for i in range(0, len(iterable), n):
        yield iterable[i:i+n]

使用方法:

>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']

1
实现很好,但它并没有回答这个问题:“如何在Python中按块(n个)迭代迭代器?”,grouper应该接受一个迭代器。 - Cédric ROYER
1
返回翻译的文本:True。但由于这个问题是谷歌搜索“python iterate in chunks” 的第一个结果,我认为它仍然属于这里。 - johnson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接