如何批量执行for循环?

27
for x in records:
   data = {}
   for y in sObjectName.describe()['fields']
         data[y['name']] = x[y['name']]
   ls.append(adapter.insert_posts(collection, data))

我希望以500的批量执行代码ls.append(adapter.insert_post(collection,x)),其中x应包含500个数据字典。我可以使用双重for循环和列表创建包含500个数据字典的列表a,然后插入它。我可以按照以下方式执行该操作,但是否有更好的方法?:

for x in records:
    for i in xrange(0,len(records)/500):
        for j in xrange(0,500):
            l=[]
            data = {}
            for y in sObjectName.describe()['fields']:
                data[y['name']] = x[y['name']]
                #print data
            #print data
            l.append(data)
        ls.append(adapter.insert_posts(collection, data))

    for i in xrange(0,len(records)%500):
        l=[]
        data = {}
        for y in sObjectName.describe()['fields']:
            data[y['name']] = x[y['name']]
            #print data
        #print data
        l.append(data)
    ls.append(adapter.insert_posts(collection, data))
5个回答

61
我通常使用的一般结构是这样的:
worklist = [...]
batchsize = 500

for i in range(0, len(worklist), batchsize):
    batch = worklist[i:i+batchsize] # the result might be shorter than batchsize at the end
    # do stuff with batch

请注意,我们在批处理过程中使用了range函数的step参数,以大大简化了处理过程。
在Python 3.12中,你现在可以使用itertools.batched来实现相同的结果:
from itertools import batched

for batch in batched(worklist, batchsize):
    # process the `batch` tuple

1
@Mudits -- 切片对于所有内置的Python序列都是宽容的。 - mgilson
8
在Python3中,使用range而不是xrange才能使这个工作正常。 - alex9311
1
我很惊讶my_list[0:5]基于my_list = [1,2,3]竟然返回了[1, 2, 3](而且没有崩溃)。 - Boern
“worklist” 是什么意思?提前感谢 @nneonneo @mgilson @alex9311 @Boern。 - StressedBoi69420
1
@StressedBoi69420:只需列出您想要分批处理的事项清单。此代码将每次处理列表的500个元素。 - nneonneo

8
如果你正在处理序列,@nneonneo提供的解决方案是你可以得到的最高性能。 如果你想要一个适用于任意可迭代对象的解决方案,你可以看一些itertools recipes,例如grouper:
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
    args = [iter(iterable)] * n
    return itertools.izip_longest(fillvalue=fillvalue, *args)

我倾向于不使用这个函数,因为它会将最后一组“填充”为 None 以使其与其他组长度相同。通常我会定义自己的变体,它不具有这种行为:

def grouper2(iterable, n):
    iterable = iter(iterable)
    while True:
        tup = tuple(itertools.islice(iterable, 0, n))
        if tup:
            yield tup
        else:
            break

这将生成请求大小的元组。通常来说,这已经足够好了,但是如果我们真的想要一些有趣的东西,我们可以编写一个生成器,它返回正确大小的惰性可迭代对象...
我认为这里“最好”的解决方案在一定程度上取决于手头的问题——特别是原始可迭代对象中的组和对象的大小以及可迭代对象的类型。通常情况下,由于它们更加复杂且很少需要,这两个最后的方法将发现使用较少。但是,如果你感到冒险并想玩一点,那就继续阅读吧!
唯一需要进行的真正修改是获取惰性可迭代对象而不是元组的能力,即查看islice中的下一个值是否存在。在这里,我只是窥视该值——如果该值缺失,将引发StopIteration,这将停止生成器,就像它正常结束一样。如果存在,我将其放回使用itertools.chain
def grouper3(iterable, n):
    iterable = iter(iterable)
    while True:
        group = itertools.islice(iterable, n)
        item = next(group)  # raises StopIteration if the group doesn't yield anything
        yield itertools.chain((item,), group)

需要注意的是,这个最后的函数“仅当”在移动到下一个可迭代对象之前完全耗尽每个可迭代对象时才会“起作用”。 在极端情况下,如果您没有耗尽任何可迭代对象,例如list(grouper3(..., n)),则将获得“m”个可迭代对象,它们只产生1个项目,而不是n(其中“m”是输入可迭代对象的“长度”)。 这种行为有时可能真的很有用,但通常不是。 我们也可以使用itertools中的“consume”配方来修复这个问题(除了导入itertools外,还需要导入collections):

def grouper4(iterable, n):
    iterable = iter(iterable)
    group = []
    while True:
        collections.deque(group, maxlen=0)  # consume all of the last group
        group = itertools.islice(iterable, n)
        item = next(group)  # raises StopIteration if the group doesn't yield anything
        group = itertools.chain((item,), group)
        yield group

当然,list(grouper4(..., n))会返回空的可迭代对象 -- 在下一次调用next之前(例如当for循环回到开始时),未从“group”中取出的任何值都不会被产生。


你可以通过直接在源可迭代对象上检查元素是否存在,而不是在分组上检查,来缩短3、4号分组器的代码。对于grouper3,循环内部可以使用item = next(iterable); yield chain((item,), islice(iterable, n-1))来实现相同的效果。你的代码非常直观和易读,所以我并不建议你改变它,只是提供一种替代的表达方式。 - Mad Physicist

5

我喜欢@nneonneo和@mgilson的答案,但是一遍又一遍地做这件事很烦人。 Python3中的itertools页面底部提到了库more-itertools(我知道这个问题是关于python2的,而这是Python3库,但有些人可能会发现这有用)。 以下代码似乎可以实现您要求的功能:

from more_itertools import chunked # Note: you might also want to look at ichuncked

for batch in chunked(records, 500):
    # Do the work--`batch` is a list of 500 records (or less for the last batch).  

2
也许是这样的吗?
l = []
for ii, x in enumerate(records):
    data = {}
    for y in sObjectName.describe()['fields']
        data[y['name']] = x[y['name']]
    l.append(data)
    if not ii % 500:
        ls.append(adapter.insert_posts(collection, l))
        l = []

1

我认为这里没有涵盖一种特殊情况。假设批处理大小为100,列表大小为103,则上述答案可能会错过最后3个元素。

list = [.....] 103 elements
total_size = len(list)
batch_size_count=100

for start_index in range(0, total_size, batch_size_count):
    list[start_index : start_index+batch_size_count] #Slicing operation

以上切片列表可以发送到每个方法调用中,以完成所有元素的执行。

1
请注意,我的解决方案不会漏掉任何元素。在 Python 中,可以指定一个超出列表结尾的结束索引 - 结果将被简单地截断到可用元素。因此,没有必要通过手动调整结束索引来过度复杂化解决方案。 - nneonneo
你是正确的。我已根据评论修改了逻辑。感谢您的通知。 - Rajesh Somasundaram

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接