如何加速Python的for循环

3

我有一个字典列表,就像这样:

[{'user': '123456', 'db': 'db1', 'size': '8628'}
{'user': '123456', 'db': 'db1', 'size': '7168'}
{'user': '123456', 'db': 'db1', 'size': '38160'}
{'user': '222345', 'db': 'db3', 'size': '8628'}
{'user': '222345', 'db': 'db3', 'size': '8628'}
{'user': '222345', 'db': 'db5', 'size': '840'}
{'user': '34521', 'db': 'db6', 'size': '12288'}
{'user': '34521', 'db': 'db6', 'size': '476'}
{'user': '2345156', 'db': 'db7', 'size': '5120'}.....]

这个列表包含数百万个条目。每个用户可能会出现在多个数据库中,每个用户在同一数据库中可能会有多个条目。我想总结每个用户在每个数据库中占用的空间大小。我不想使用pandas。

  • 我创建了两个唯一用户和唯一数据库的列表
  • 使用这些列表遍历大列表,并在用户和数据库相同时进行求和
result = []
for user in unique_users:
    for db in unique_dbs:
        total_size = 0
        for i in big_list:
            if (i['user'] == user and i['db'] == db):
                total_size += float(i['size'])
        if(total_size) > 0:
            row = {}
            row['user'] = user
            row['db'] = db
            row['size'] = total_size
            result.append(row)

问题在于这个三重循环会变得非常庞大(数百亿次迭代),需要花费很长时间来汇总结果。如果big_list很小,那么这种方法就可以很好地工作。

我应该如何处理才能保持快速和简单?非常感谢!


为什么不创建一个用户到数据库列表及其相应大小的映射呢? - rdas
我该怎么做? - Ioan Fulgeanu
为什么你不想使用pandas呢?它会让这变得轻而易举... - Nick
数据是否按用户和数据库排序? - Nick
我想总结每个用户在每个数据库中占用的空间大小。 我不太明白。 对于此处显示的输入,输出应该是什么? - Karl Knechtel
4个回答

3

尝试将用户映射到数据库中的总大小字典中。这将需要额外的内存,但应该更快地访问,并且只需要一次通过数据:

user_to_db_to_size = {}
for entry in unique_users:
    user = entry['user']
    db = entry['db']
    size = int(entry['size'])
    if user not in user_to_db_to_size:
        user_to_db_to_size[user] = {}
    if db not in user_to_db_to_size[user]:
        user_to_db_to_size[user][db] = 0
    user_to_db_to_size[user][db] += size

print(user_to_db_to_size)

对于您的样例数据,它会生成:
{'123456': {'db1': 53956}, '222345': {'db3': 17256, 'db5': 840}, '34521': {'db6': 12764}, '2345156': {'db7': 5120}}

现在,您可以使用以下代码查看每个用户/数据库的总大小:

print(user_to_db_to_size['123456']['db1'])  # 53956

只是一个提示,缓存基本字典查找将使您获得5%的速度提升。我在下面包含了一些基准测试。 - John M.

3
总的来说,针对这个问题提出的三个解决方案中,rdas的方法可能是最优的。经过一些修改后,它比Jérôme的解决方案快57%,比原始代码快180倍。在裁剪后的结果子集(1000个条目)上,Сергей的解决方案大约慢了350倍;它似乎也很难扩展,我没有时间等待完整数据集的结果。
计时如下:
方法 时间 相对值
原始 102.83720700000413 180.4580695623077 Jérôme 0.8722491000080481 1.5306171118096032 Jérôme(更新) 0.9000219000154175 1.5793526426731528 rdas 0.6866130999987945 1.2048642527015463 rdas(更新) 0.6580462999991141 1.1547354157572032 使用defaultdict的rdas 0.5698675999883562 1.0
在Jérôme的答案中,最终结果的计算可以通过将最终列表构建换成列表推导式来改进。单独查看列表推导式更改,更新后的代码大约快38%。值得注意的是,这不是总体时间的主要贡献因素。
[{'user': user, 'db': db, 'size': total_size} for user in unique_users for db in unique_dbs if (total_size := aggregate_dict.get((user, db)))]

对几次迭代进行前后比较(Method1和Method2),结果如下:

Method1: 6.447344800049905
Method2: 4.851344000024255
Method1: 6.546811900043394
Method2: 4.816081999975722
Method1: 6.863985500007402
Method2: 4.755402299982961
Method1: 6.8024070000392385
Method2: 4.738170499971602
Method1: 6.524162200046703
Method2: 4.755566900013946

rdas的第一个更新是避免进行多个字典查找,通过缓存查找第一个字典来实现:
if not (user_dict := user_to_db_to_size.get(user)):
    user_to_db_to_size[user] = { db: size }
elif not db in user_dict:
    user_dict[db] = size
else:
    user_dict[db] += size

这之后通过更换为 collections.defaultdict 再次改进,该方法移除了额外的昂贵字典查找操作:

user_to_db_to_size = defaultdict(lambda: defaultdict(int))
for entry in big_list:
    user = entry['user']
    db = entry['db']
    size = int(entry['size'])
    user_to_db_to_size[user][db] += size
return user_to_db_to_size

这是用于运行基准测试的代码:

import random
import timeit
from collections import defaultdict

random.seed(1)

test_iterations = 100
big_list = [{'user': random.randint(0, 100), 'db': f'db{random.randint(1, 10)}', 'size': f'{random.randint(100, 90000)}' } for i in range(10000)]
unique_users = { i['user'] for i in big_list }
unique_dbs = { i['db'] for i in big_list }

def method0():
    result = []
    for user in unique_users:
        for db in unique_dbs:
            total_size = 0
            for i in big_list:
                if (i['user'] == user and i['db'] == db):
                    total_size += float(i['size'])
            if(total_size) > 0:
                row = {}
                row['user'] = user
                row['db'] = db
                row['size'] = total_size
                result.append(row)
    return result

def method1():
    aggregate_dict = dict()
    for i in big_list:
        key = (i['user'], i['db'])
        value = float(i['size'])
        if key in aggregate_dict:
            aggregate_dict[key] += value
        else:
            aggregate_dict[key] = value
    result = []
    for user in unique_users:
        for db in unique_dbs:
            total_size = aggregate_dict.get((user, db))
            if total_size is not None and total_size > 0:
                result.append({'user': user, 'db': db, 'size': total_size})
    return result

def method2():
    aggregate_dict = dict()
    for i in big_list:
        key = (i['user'], i['db'])
        value = float(i['size'])
        if key in aggregate_dict:
            aggregate_dict[key] += value
        else:
            aggregate_dict[key] = value
    return [{'user': user, 'db': db, 'size': total_size} for user in unique_users for db in unique_dbs if (total_size := aggregate_dict.get((user, db)))]

def method3():
    user_to_db_to_size = {}
    for entry in big_list:
        user = entry['user']
        db = entry['db']
        size = int(entry['size'])
        if user not in user_to_db_to_size:
            user_to_db_to_size[user] = {}
        if db not in user_to_db_to_size[user]:
            user_to_db_to_size[user][db] = 0
        user_to_db_to_size[user][db] += size
    return user_to_db_to_size

def method4():
    user_to_db_to_size = {}
    for entry in big_list:
        user = entry['user']
        db = entry['db']
        size = int(entry['size'])
        if not (user_dict := user_to_db_to_size.get(user)):
            user_to_db_to_size[user] = { db: size }
        elif not db in user_dict:
            user_dict[db] = size
        else:
            user_dict[db] += size
    return user_to_db_to_size

def method5():
    user_to_db_to_size = defaultdict(lambda: defaultdict(int))
    for entry in big_list:
        user = entry['user']
        db = entry['db']
        size = int(entry['size'])
        user_to_db_to_size[user][db] += size
    return user_to_db_to_size

assert (m0 := method0()) == method1()
print('Method1 OK')
assert m0 == method2()
print('Method2 OK')
assert all(v['size'] == method3()[v['user']][v['db']] for v in m0)
print('Method3 OK')
assert all(v['size'] == method4()[v['user']][v['db']] for v in m0)
print('Method4 OK')
assert all(v['size'] == method5()[v['user']][v['db']] for v in m0)
print('Method5 OK')

t0 = timeit.timeit(method0, number=test_iterations)
t1 = timeit.timeit(method1, number=test_iterations)
t2 = timeit.timeit(method2, number=test_iterations)
t3 = timeit.timeit(method3, number=test_iterations)
t4 = timeit.timeit(method4, number=test_iterations)
t5 = timeit.timeit(method5, number=test_iterations)
tmin = min((t0, t1, t2, t3, t4, t5))

print(f'| Method           | Time | Relative      |')
print(f'|------------------|----------------------|')
print(f'| Original         | {t0} | {t0 / tmin}   |')
print(f'| Jérôme           | {t1} | {t1 / tmin}   |')
print(f'| Jérôme (updated) | {t2} | {t2 / tmin}   |')
print(f'| rdas             | {t3} | {t3 / tmin}   |')
print(f'| rdas (updated)   | {t4} | {t4 / tmin}   |')
print(f'| defaultdict      | {t5} | {t5 / tmin}   |')

结果为:

Method1 OK
Method2 OK
Method3 OK
Method4 OK
Method4 OK
| Method           | Time | Relative      |
|------------------|----------------------|
| Original         | 102.83720700000413 | 180.4580695623077   |
| Jérôme           | 0.8722491000080481 | 1.5306171118096032   |
| Jérôme (updated) | 0.9000219000154175 | 1.5793526426731528   |
| rdas             | 0.6866130999987945 | 1.2048642527015463   |
| rdas (updated)   | 0.6580462999991141 | 1.1547354157572032   |
| defaultdict      | 0.5698675999883562 | 1.0   |

注意:Сергей的方法大致相当于:

def c():
    tmp = Counter()
    for d in big_list:
        tmp = tmp + Counter(d)
    return tmp

因此,在循环中每次都会创建一个新的字典。对于循环中的每次迭代,它创建的字典将会更大。我怀疑这样的规模是 N ** 2 或更糟糕。


3

目前方法存在两个主要问题:算法效率低和数据结构效率低。

首先,使用的算法明显效率低下,因为它在大列表上进行了多次迭代。没有必要遍历整个列表来过滤唯一的用户和数据库。您可以一次遍历大列表并使用字典来聚合数据。目标字典的键是一个(user, db)元组。字典的值为total_size。以下是未经测试的示例:

# Aggregation part
# Note: a default dict can be used instead to make the code possibly simpler
aggregate_dict = dict()
for i in big_list:
    key = (i['user'], i['db'])
    value = float(i['size'])
    if key in aggregate_dict:
        aggregate_dict[key] += value
    else:
        aggregate_dict[key] = value

# Fast creation of `result`
result = []
for user in unique_users:
    for db in unique_dbs:
        total_size = aggregate_dict.get((user, key))
        if total_size is not None and total_size > 0:
            result.append({'user': user, 'db': db, 'size': total_size})

另一个问题是数据结构效率低下:对于每一行,键被复制了多次,而应该使用元组来代替。实际上,更好的数据结构是存储一个字典,其中包含(column, items)键值对,其中items是目标列的项目列表。这种存储数据的方式被称为数据帧(Dataframe)。这就是Pandas在内部使用的方法(除了它是一个Numpy数组,这甚至更好,因为它比大多数操作的列表更紧凑,通常更有效)。如果将此数据结构用于输入和输出,应该会显著提高速度(如果与Numpy结合使用)并降低内存占用。

1
注意,使用列表推导式作为最终结果的创建方式大约快8%,因此可以使用以下代码:[{'user': user, 'db': db, 'size': total_size} for user in unique_users for db in unique_dbs if (total_size := aggregate_dict.get((user, key)))]。请注意,假设大小为正数,则我认为if total_size is not None and total_size > 0可以简化为if total_size,因为总大小为None和零都将计算为false。我将在下面附上测试代码供您参考。虽然时间有点不稳定,但列表推导式始终更快。 - John M.
@JohnM。确实是为了理解列表。对于大小,它们肯定是正数,但可能不是严格的正数(即可能为空)。感谢您指出这一点并完成基准测试。 - Jérôme Richard
1
不客气。虽然你做了所有的艰苦工作。但是我相信null仍然会被评估为false。 - John M.
1
实际上,我认为8%不正确。在aggregate_dict.get((user, key))中的查找键应该是aggregate_dict.get((user, db))。这个错误意味着结果列表为空,因为它实际上在搜索元组(user, (key, value))。将其更改为db可以使列表推导式快38%。 - John M.

1
如果您使用计数器并将值对元组(用户,数据库)作为键,则:
from collections import Counter

data = [{'user': '123456', 'db': 'db1', 'size': '8628'},
        {'user': '123456', 'db': 'db1', 'size': '7168'},
        {'user': '123456', 'db': 'db1', 'size': '38160'},
        {'user': '222345', 'db': 'db3', 'size': '8628'},
        {'user': '222345', 'db': 'db3', 'size': '8628'},
        {'user': '222345', 'db': 'db5', 'size': '840'},
        {'user': '34521', 'db': 'db6', 'size': '12288'},
        {'user': '34521', 'db': 'db6', 'size': '476'},
        {'user': '2345156', 'db': 'db7', 'size': '5120'}]

print(sum((Counter({(d['user'], d['db']): int(d['size'])}) for d in data), start=Counter()))

Counter({('123456', 'db1'): 53956, ('222345', 'db3'): 17256, ('34521', 'db6'): 12764, ('2345156', 'db7'): 5120, ('222345', 'db5'): 840})

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接