你能想到一种好的方式(也许使用itertools)将迭代器分成给定大小的块吗?
因此,l=[1,2,3,4,5,6,7]
,使用chunks(l,3)
变为迭代器[1,2,3], [4,5,6], [7]
我可以想出一个小程序来实现,但不知道是否有更好的方法,也许可以使用itertools。
你能想到一种好的方式(也许使用itertools)将迭代器分成给定大小的块吗?
因此,l=[1,2,3,4,5,6,7]
,使用chunks(l,3)
变为迭代器[1,2,3], [4,5,6], [7]
我可以想出一个小程序来实现,但不知道是否有更好的方法,也许可以使用itertools。
itertools
文档的recipes的grouper()
方案接近您想要的内容:
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
"Collect data into non-overlapping fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
# grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
# grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
args = [iter(iterable)] * n
if incomplete == 'fill':
return zip_longest(*args, fillvalue=fillvalue)
if incomplete == 'strict':
return zip(*args, strict=True)
if incomplete == 'ignore':
return zip(*args)
else:
raise ValueError('Expected fill, strict, or ignore')
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while (batch := tuple(islice(it, n))):
yield batch
(my_list[i:i + chunk_size] for i in range(0, len(my_list), chunk_size))
n
非常有效,但是对于大型群体来说,效率非常低下。例如我的n
是20万。创建一个临时列表有20万项......不太理想。 - Jonathan Eunice.resize()
方法。如果您想分块普通迭代器,第二种方法已经非常好了--它创建了大小为200K的临时元组,但这并不是大问题。 - Sven Marnach尽管 OP 要求函数以列表或元组的形式返回块,但如果您需要返回迭代器,则可以修改Sven Marnach的解决方案:
def batched_it(iterable, n):
"Batch data into iterators of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while True:
chunk_it = itertools.islice(it, n)
try:
first_el = next(chunk_it)
except StopIteration:
return
yield itertools.chain((first_el,), chunk_it)
一些基准测试:http://pastebin.com/YkKFvm8b
只有当您的函数遍历每个块中的元素时,它才会略微更有效率。
chunk_it
(例如,通过提前跳出内部循环),那么这样做会出现错误吗? - Tavian Barnesfor x in it: yield chain((x,), islice(it, n))
,对吧? - Claasislice(it, n - 1)
(或者为了性能,你需要在一开始就减少 n
,并验证它是否仍然 >=0
)来正确计算数量,但是是的,这将是一个稍微快一点的解决方案(因为它会将每个项目的工作量略微推向 C 层)。 - ShadowRangerPython 3.12增加了itertools.batched,它可以处理所有可迭代对象(包括列表):
>>> from itertools import batched
>>> for batch in batched('ABCDEFG', 3):
... print(batch)
('A', 'B', 'C')
('D', 'E', 'F')
('G',)
自 Python 3.8 版本以来,有一个更简单的解决方案,使用 :=
运算符:
def grouper(iterator: Iterator, n: int) -> Iterator[list]:
while chunk := list(itertools.islice(iterator, n)):
yield chunk
然后以那种方式调用它:
>>> list(grouper(iter('ABCDEFG'), 3))
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
注意:您可以在grouper
函数中放置iter
以使用可迭代对象
而不是迭代器
。
Iterator
类型,可以使用 from collections.abc import Iterator
进行导入。 - Md Mazedul Islam Khangrouper
中直接传递列表(例如OP示例中的l
)会导致无限循环。请改为传递iter(l)
,或相应地修改函数。 - Nicolai Weitkemper这适用于任何可迭代对象。它返回生成器的生成器(以完全灵活性为基础)。我现在意识到它基本上与 @reclosedevs 的解决方案相同,但没有废话。不需要 try...except
,因为 StopIteration
会传播上来,这正是我们想要的。
调用 next(iterable)
来提高 iterable
为空时的 StopIteration
,因为如果你让它这样做,islice
将会无限产生空生成器。
这个方法更好,因为它只有两行代码,但易于理解。
def grouper(iterable, n):
while True:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
请注意,next(iterable)
被放入一个元组中。否则,如果next(iterable)
本身是可迭代对象,则itertools.chain
会将其展开。感谢Jeremy Brown指出这个问题。
注意:next(iterable)
会被放进一个元组中。否则,如果next(iterable)
本身也是可迭代对象的话,就会被itertools.chain
展开。感谢Jeremy Brown指出这个问题。
next(iterable)
而不是iterable.next()
。 - Antti Haapala -- Слава Україніiterable = iter(iterable)
可能是有意义的,它可以将你的iterable转换为一个iterator。需要注意的是,可迭代对象(Iterables)没有__next__
方法。 - Mateen Ulhaq今天我在做一些工作,想出了一个我认为是简单的解决方案。它类似于jsbueno的答案,但我认为当iterable
的长度可被n
整除时,他的答案会产生空的group
。我的答案在iterable
用尽时进行简单检查。
def chunk(iterable, chunk_size):
"""Generates lists of `chunk_size` elements from `iterable`.
>>> list(chunk((2, 3, 5, 7), 3))
[[2, 3, 5], [7]]
>>> list(chunk((2, 3, 5, 7), 2))
[[2, 3], [5, 7]]
"""
iterable = iter(iterable)
while True:
chunk = []
try:
for _ in range(chunk_size):
chunk.append(next(iterable))
yield chunk
except StopIteration:
if chunk:
yield chunk
break
iterable.next()
更改为next(iterable)
。 - rrauenzamap(list, chunks(...))
。from itertools import islice, chain
from collections import deque
def chunks(items, n):
items = iter(items)
for first in items:
chunk = chain((first,), islice(items, n-1))
yield chunk
deque(chunk, 0)
if __name__ == "__main__":
for chunk in map(list, chunks(range(10), 3)):
print chunk
for i, chunk in enumerate(chunks(range(10), 3)):
if i % 2 == 1:
print "chunk #%d: %s" % (i, list(chunk))
else:
print "skipping #%d" % i
list(chunks(range(10), 3))
时,所有的可迭代对象都已经被消耗完了。 - Sven Marnachchunker = lambda iterable, n: (ifilterfalse(lambda x: x == (), chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=())))
[iter(iterable)]*n
是一个包含相同迭代器的列表,zipping将从列表中的每个迭代器中取出一个项目,这些迭代器是相同的,结果每个zip元素包含一组n
项。
izip_longest
需要完全消耗底层可迭代对象,而不是在到达第一个用尽的迭代器时停止迭代,这会截断iterable
的任何剩余部分。这导致需要过滤填充值。因此,稍微更健壮的实现如下:def chunker(iterable, n):
class Filler(object): pass
return (ifilterfalse(lambda x: x is Filler, chunk) for chunk in (izip_longest(*[iter(iterable)]*n, fillvalue=Filler)))
iterable = range(1,11)
map(tuple,chunker(iterable, 3))
[(1, 2, 3), (4, 5, 6), (7, 8, 9), (10,)]
map(tuple,chunker(iterable, 2))
[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]
map(tuple,chunker(iterable, 4))
[(1, 2, 3, 4), (5, 6, 7, 8), (9, 10)]
def chunks(it, step):
start = 0
while True:
end = start+step
yield islice(it, start, end)
start = end
区别在于,由于islice
在调用超出it
结尾时没有引发StopIteration或其他任何异常,因此它会一直生成;还有一个稍微棘手的问题是必须先消耗islice
结果,然后才能迭代该生成器。
要功能性地生成移动窗口:
izip(count(0, step), count(step, step))
(it[start:end] for (start,end) in izip(count(0, step), count(step, step)))
但是,这仍然会创建一个无限迭代器。因此,需要使用takewhile(或者可能有更好的选择)来限制它:
chunk = lambda it, step: takewhile((lambda x: len(x) > 0), (it[start:end] for (start,end) in izip(count(0, step), count(step, step))))
g = chunk(range(1,11), 3)
tuple(g)
([1, 2, 3], [4, 5, 6], [7, 8, 9], [10])
deque(chunk, 0)
来消耗它们,但这个解决方案也有问题--请参见我对他的答案的评论。 - Sven Marnachchunker()
的最新版本。另外,创建一个唯一的标记的好方法是sentinel = object()
- 它保证与任何其他对象都不同。 - Sven Marnachdef grouper (iterable, n):
iterable = iter(iterable)
count = 0
group = []
while True:
try:
group.append(next(iterable))
count += 1
if count % n == 0:
yield group
group = []
except StopIteration:
yield group
break
代码高尔夫版:
def grouper(iterable, n):
for i in range(0, len(iterable), n):
yield iterable[i:i+n]
使用方法:
>>> list(grouper('ABCDEFG', 3))
['ABC', 'DEF', 'G']
grouper
应该接受一个迭代器。 - Cédric ROYER