将Python列表分成最大内存大小的块

7

给定一个由 bytes 值组成的 Python list:

# actual str values un-important
[
    b'foo',
    b'bar',
    b'baz',
    ...
]

如何将列表分成多个块,使得每个块的最大内存大小低于某个特定值?

例如:如果限制为7个字节,则原始列表将被分割成一个列表的列表。

[
    [b'foo', b'bar'], # sublist 0
    [b'baz'], # sublist 1
    ...
]

每个子列表的长度不会超过7个字节,这取决于列表内容的累积长度。
注意:每个子列表应该按照原始列表的顺序进行最大程度地压缩。在上面的示例中,前两个字符串值被分组,因为这是在7个字节限制下的最大可能值。
提前感谢您的考虑和回复。

5
在Python中获取对象大小是一个相当复杂/昂贵的操作,因为您需要按递归方式迭代容器类型,如此答案所述:https://dev59.com/9HRB5IYBdhLWcg3w-789#30316760。不过这是一个有趣的问题,那么为什么要根据大小将列表拆分成块呢? - Iain Shelvington
1
我有点困惑你试图实现什么。字典本身的大小将保持一致,因为它们实际上并不包含值,只是对值的引用。这是一个 XY 问题吗?在我的系统中,sys.getsizeof 对于所有这些返回 240{'a': []}, {'b': [1]}, {'c': [1, 2] * 999} - DeepSpace
单个字典是否可以超过1 KB的阈值? - Grzegorz Skibinski
@IainShelvington 这些字典最终会被广播到 AWS Kinesis,其最大批处理记录大小为5MB。 - Ramón J Romero y Vigil
1
@MisterMiyagi 好的,我明白你的意思了,基本上和我在第一条评论中所说的一样。 - DeepSpace
显示剩余11条评论
5个回答

7
将一个序列按照给定的最大/最小条件进行最优分割,同时保持元素的顺序,这个问题可以使用贪心算法来解决。因此,你只需要迭代一次输入序列并维护元素缓冲区即可。
在Python中,可以使用生成器来优雅地编写代码,并且不需要创建结果数组。
下面是该问题的主要算法:
def split_by_size(items, max_size, get_size=len):
    buffer = []
    buffer_size = 0
    for item in items:
        item_size = get_size(item)
        if buffer_size + item_size <= max_size:
            buffer.append(item)
            buffer_size += item_size
        else:
            yield buffer
            buffer = [item]
            buffer_size = item_size
    if buffer_size > 0:
        yield buffer

其中最后一个参数将决定给定项目的大小的问题委托给指定的可调用对象。

我不会详细解释这个参数,但是我假设一个简单的len()函数就可以了。此外,这也假设每个元素都满足条件,否则还需要处理这种情况。

测试以上代码:

import random


k = 10
n = 15
max_size = 10

random.seed(0)
items = [b'x' * random.randint(1, 2 * k // 3) for _ in range(n)]
print(items)
# [b'xxxx', b'xxxx', b'x', b'xxx', b'xxxxx', b'xxxx', b'xxxx', b'xxx', b'xxxx', b'xxx', b'xxxxx', b'xx', b'xxxxx', b'xx', b'xxx']

print(list(split_by_size(items, k)))
# [[b'xxxx', b'xxxx', b'x'], [b'xxx', b'xxxxx'], [b'xxxx', b'xxxx'], [b'xxx', b'xxxx', b'xxx'], [b'xxxxx', b'xx'], [b'xxxxx', b'xx', b'xxx']]

此外,如果您仍愿意将拆分结果存储在一个list中,那么以上方法的代码可以稍微更紧凑一些:
def chunks_by_size(items, max_size, get_size=len):
    result = []
    size = max_size + 1
    for item in items:
        item_size = get_size(item)
        size += item_size
        if size > max_size:
            result.append([])
            size = item_size
        result[-1].append(item)
    return result

但速度稍慢(请见下面的基准测试结果)。


您还可以考虑使用 functools.reduce()(与@NizamMohamed答案基本相同),这样代码会更短,但也许不太易读:

def chunks_by_size_reduce(items, size, get_size=len):
    return functools.reduce(
        lambda a, b, size=size:
            a[-1].append(b) or a
            if a and sum(get_size(x) for x in a[-1]) + get_size(b) <= size
            else a.append([b]) or a, items, [])

由于对于每个被考虑的元素都要调用"candidate"内部列表的每个元素,因此效率显然会更低,这使得复杂度为O(n k!),其中k是每个子序列中平均元素的数量。请参见下面的基准测试结果。


如果使用itertools.accumulate()来解决问题,我不会感到惊讶,但这也肯定会相当慢。


加速的最简单方法是使用CythonNumba,在这里,它们被应用于split_by_size()函数。对于两者而言,代码都不需要改变。

对所有这些进行基准测试,我们获得以下结果(_cy代表Cython编译版本,而_nb代表Numba编译版本):

%timeit list(split_by_size(items * 100000, k + 1))
# 10 loops, best of 3: 281 ms per loop
%timeit list(split_by_size_cy(items * 100000, k + 1))
# 10 loops, best of 3: 181 ms per loop
%timeit list(split_by_size_nb(items * 100000, k + 1))
# 100 loops, best of 3: 5.17 ms per loop
%timeit chunks_by_size(items * 100000, k + 1)
# 10 loops, best of 3: 318 ms per loop
%timeit chunks_by_size_reduce(items * 100000, k + 1)
# 1 loop, best of 3: 1.18 s per loop

需要注意的是,尽管Numba编译版本比其他替代方案快得多,但它也是最脆弱的,因为它需要设置forceobj标志为True,这可能导致执行不稳定。

无论如何,如果最终目标是通过某些I/O操作发送分组项目,我几乎不认为这会成为瓶颈。


请注意,该算法与其他答案基本相同,只是我认为这里的代码更加简洁。


这个很好用!对于任何想要基于内存查找潜在其他大小可调用项的人,我测试了来自Pympler的asizeof模块以及sys.getsizeof作为示例中len的替代品。更多细节请参见https://dev59.com/iZPea4cB1Zd3GeqP9gOc - jcp

2
这个解决方案使用了functools.reduce
l = [b'abc', b'def', b'ghi', b'jklm', b'nopqrstuv', b'wx', b'yz']

reduce(lambda a, b, size=7: a[-1].append(b) or a if a and sum(len(x) for x in a[-1]) + len(b) <= size else a.append([b]) or a, l, [])

a 是一个空的 listb 是原始 list 中的一项。

if a and sum(len(x) for x in a[-1]) + len(b) <= size
检查 a 不为空且最后添加的 list 中的 bytes 长度总和加上 b 的长度不超过 size

a[-1].append(b) or a
b 添加到 a 的最后一个添加的 list 中,并在条件成立时返回 a

a.append([b]) or a
创建一个包含 blist 并将其附加到 a,并在条件成立时返回 a

输出:

[[b'abc', b'def'], [b'ghi', b'jklm'], [b'nopqrstuv'], [b'wx', b'yz']]

1
保持简短而甜美:

l = [b'foo', b'bar', b'baz']

thresh = 7
out = []
cur_size = 0
for x in l:
    if len(x) > thresh:
        raise ValueError("str too big")
    if cur_size + len(x) > thresh:
        cur_size = 0
    if cur_size == 0:
        out.append([])
    out[-1].append(x)
    cur_size += len(x)

print(out)

这将输出:
[[b'foo', b'bar'], [b'baz']]

如果我理解正确,那应该是你想要的。它非常简单;它只是将列表中的字符串附加在一起,并检查正在附加到的当前列表中所有内容的组合大小--如果大小加上下一个项目将大于阈值,则重新启动。

1
简单而天真的方法是:

import sys
import numpy as np

# init input data - as per the comments, data type does matter, 
# for memory calculation, and for the sake of example -
# string is probably the easiest case:

lts=list("abcdefghijklmnopqrstuvwxyz")

data=[{letter: "".join(np.random.choice(lts, np.random.randint(100, 700)))} for letter in lts]

# parameters setup:

threshold=1024
buffer=[]
buffer_len=0
res_data=[]

for el in data:
    len_=sys.getsizeof(list(el.values())[0]) # I assumed it's one key, one value per dictionary (looks like this from your question) 
    if(buffer_len+len_>threshold):
        res_data.append(buffer)
        buffer=[el]
        buffer_len=len_
    else:
        buffer.append(el)
        buffer_len+=len_

if(buffer_len>0):
    res_data.append(buffer)

print(res_data)

0
from sys import getsizeof
import math
def chunkify_list(L, max_size_kb):
    chunk_size_elements = int(math.ceil(len(L)/int(math.ceil(getsizeof(L)/(1024*max_size_kb)))))
    return [L[x: x+chunk_size_elements] for x in range(0, len(L), chunk_size_elements)]

我写了这段代码,它对我有效。 它需要访问数学库。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接