zip_longest对于左侧列表的应用方法

33

我知道zip函数(它将根据最短的列表进行压缩)和zip_longest函数(它将根据最长的列表进行压缩),但是如何根据第一个列表进行压缩,而不管它是否是最长的呢?

例如:

Input:  ['a', 'b', 'c'], [1, 2]
Output: [('a', 1), ('b', 2), ('c', None)]

但也包括:

Input:  ['a', 'b'], [1, 2, 3]
Output: [('a', 1), ('b', 2)]

这两种功能是否可以在一个函数中同时存在?

10个回答

35

解决方案

将重复的填充值链接到除第一个可迭代对象之外的其他可迭代对象后面:

from itertools import chain, repeat

def zip_first(first, *rest, fillvalue=None):
    return zip(first, *map(chain, rest, repeat(repeat(fillvalue))))

或者使用 zip_longest 并用 compresszip 技巧进行修剪:

def zip_first(first, *rest, fillvalue=None):
    a, b = tee(first)
    return compress(zip_longest(b, *rest, fillvalue=fillvalue), zip(a))

就像 zipzip_longest 一样,这些函数接受任何数量(至少一个)的任何类型的可迭代对象(包括无限制的对象),并返回一个迭代器(如果需要,可以转换为列表)。

基准测试结果

与其他同样通用的解决方案进行基准测试(所有代码都在答案的末尾):

10 iterables of 10,000 to 90,000 elements, first has 50,000:
────────────────────────────────────────────────────────────
 2.2 ms   2.2 ms   2.3 ms  limit_cheat
 2.6 ms   2.6 ms   2.6 ms  Kelly_Bundy_chain
 3.3 ms   3.3 ms   3.3 ms  Kelly_Bundy_compress
50.2 ms  50.6 ms  50.7 ms  CrazyChucky
54.7 ms  55.0 ms  55.0 ms  Sven_Marnach
74.8 ms  74.9 ms  75.0 ms  Mad_Physicist
 5.4 ms   5.4 ms   5.4 ms  Kelly_Bundy_3
 5.9 ms   6.0 ms   6.0 ms  Kelly_Bundy_4
 4.6 ms   4.7 ms   4.7 ms  Kelly_Bundy_5

10,000 iterables of 0 to 100 elements, first has 50:
────────────────────────────────────────────────────
 4.6 ms   4.7 ms   4.8 ms  limit_cheat
 4.8 ms   4.8 ms   4.8 ms  Kelly_Bundy_compress
 8.4 ms   8.4 ms   8.4 ms  Kelly_Bundy_chain
27.1 ms  27.3 ms  27.5 ms  CrazyChucky
38.3 ms  38.5 ms  38.7 ms  Sven_Marnach
73.0 ms  73.0 ms  73.1 ms  Mad_Physicist
 4.9 ms   4.9 ms   5.0 ms  Kelly_Bundy_3
 4.9 ms   4.9 ms   5.0 ms  Kelly_Bundy_4
 5.0 ms   5.0 ms   5.0 ms  Kelly_Bundy_5


第一个是一个作弊方法,它知道长度,这是为了展示我们能够达到的最快速度的限制。
解释:
上述两种解决方案的简要说明:
第一种解决方案,如果与三个可迭代对象一起使用,则相当于以下内容:
def zip_first(first, second, third, fillvalue=None):
    filler = repeat(fillvalue)
    return zip(first,
               chain(second, filler),
               chain(third, filler))

第二种方法基本上是让zip_longest来完成工作。唯一的问题是,当第一个可迭代对象完成时,它并不会停止。所以我使用tee复制第一个可迭代对象,然后用一个用于其元素,另一个用于其长度。zip(a)将每个元素包装在1元组中,非空元组为true。因此,compress给出了所有由zip_longest产生的元组,正好有第一个可迭代对象中的元素个数。

基准测试代码 (
def limit_cheat(*iterables, fillvalue=None):
    return islice(zip_longest(*iterables, fillvalue=fillvalue), cheat_length)

def Kelly_Bundy_chain(first, *rest, fillvalue=None):
    return zip(first, *map(chain, rest, repeat(repeat(fillvalue))))

def Kelly_Bundy_compress(first, *rest, fillvalue=None):
    a, b = tee(first)
    return compress(zip_longest(b, *rest, fillvalue=fillvalue), zip(a))

def CrazyChucky(*iterables, fillvalue=None):
    SENTINEL = object()
    
    for first, *others in zip_longest(*iterables, fillvalue=SENTINEL):
        if first is SENTINEL:
            return
        others = [i if i is not SENTINEL else fillvalue for i in others]
        yield (first, *others)

def Sven_Marnach(first, *rest, fillvalue=None):
    rest = [iter(r) for r in rest]
    for x in first:
        yield x, *(next(r, fillvalue) for r in rest)

def Mad_Physicist(*args, fillvalue=None):
    # zip_by_first('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
    # zip_by_first('ABC', 'xyzw', fillvalue='-') --> Ax By Cz
    if not args:
        return
    iterators = [iter(it) for it in args]
    while True:
        values = []
        for i, it in enumerate(iterators):
            try:
                value = next(it)
            except StopIteration:
                if i == 0:
                    return
                iterators[i] = repeat(fillvalue)
                value = fillvalue
            values.append(value)
        yield tuple(values)

def Kelly_Bundy_3(first, *rest, fillvalue=None):
    a, b = tee(first)
    return map(itemgetter(1), zip(a, zip_longest(b, *rest, fillvalue=fillvalue)))

def Kelly_Bundy_4(first, *rest, fillvalue=None):
    sentinel = object()
    for z in zip_longest(chain(first, [sentinel]), *rest, fillvalue=fillvalue):
        if z[0] is sentinel:
            break
        yield z

def Kelly_Bundy_5(first, *rest, fillvalue=None):
    stopped = False
    def stop():
        nonlocal stopped
        stopped = True
        return
        yield
    for z in zip_longest(chain(first, stop()), *rest, fillvalue=fillvalue):
        if stopped:
            break
        yield z


import timeit
from itertools import chain, repeat, zip_longest, islice, tee, compress
from operator import itemgetter
from collections import deque

funcs = [
    limit_cheat,
    Kelly_Bundy_chain,
    Kelly_Bundy_compress,
    CrazyChucky,
    Sven_Marnach,
    Mad_Physicist,
    Kelly_Bundy_3,
    Kelly_Bundy_4,
    Kelly_Bundy_5,
]

def test(args_creator):

    # Correctness
    expect = list(funcs[0](*args_creator()))
    for func in funcs:
        result = list(func(*args_creator()))
        print(result == expect, func.__name__)
    
    # Speed
    tss = [[] for _ in funcs]
    for _ in range(5):
        print()
        print(args_creator.__name__)
        for func, ts in zip(funcs, tss):
            t = min(timeit.repeat(lambda: deque(func(*args_creator()), 0), number=1))
            ts.append(t)
            print(*('%4.1f ms ' % (t * 1e3) for t in sorted(ts)[:3]), func.__name__)

def args_few_but_long_iterables():
    global cheat_length
    cheat_length = 50_000
    first = repeat(0, 50_000)
    rest = [repeat(i, 10_000 * i) for i in range(1, 10)]
    return first, *rest

def args_many_but_short_iterables():
    global cheat_length
    cheat_length = 50
    first = repeat(0, 50)
    rest = [repeat(i, i % 101) for i in range(1, 10_000)]
    return first, *rest

test(args_few_but_long_iterables)
funcs[1:3] = funcs[1:3][::-1]
test(args_many_but_short_iterables)


1
我完全忘记了 map 的额外参数,所以花了好2-3分钟才意识到你在这里做什么 :) 我唯一的建议是添加参数 fillvalue=None 并将 repeat(repeat(None)) 更改为 repeat(repeat(fillvalue)) - Mad Physicist
@MadPhysicist 好的,我也会尝试一下。我刚刚完成了一个基准测试,使用 first = repeat(0, 50_000)rest = [repeat(i, 10_000 * i) for i in range(1, 10)],你的代码需要 75 毫秒,而我的只需要 2.7 毫秒 :-D - Kelly Bundy
那应该足够了。你能在这里发布测试代码吗? - Mad Physicist
@MadPhysicist 现在又加入了另一个解决方案和一个作弊方法。看起来至少对于这个测试用例,我已经接近了可能性的极限。我还尝试了使用100个可迭代对象(first = repeat(0, 5_000)rest = [repeat(i, 100 * i) for i in range(1, 100)]),结果变化不大。 - Kelly Bundy
1
@CrazyChucky 在我的基准测试中,使用少量但长的可迭代对象没有区别,但在我另一个基准测试中(刚刚添加),使用许多但短的可迭代对象会更慢:大约24.9毫秒而不是18.6毫秒。通常情况下,当两者都使用函数时,map比列表推导更快。更重要的是,您为每个可迭代对象创建一个repeat,而我只创建一个共享的repeat,并将其链接到所有可迭代对象上。您的带有单个准备好的列表推导也需要大约19.5毫秒。 - Kelly Bundy
显示剩余6条评论

9
你可以转换“大致相等”的Python代码,该代码在 itertools.zip_longest 文档中显示,以创建根据第一个参数的长度进行压缩的通用版本:
from itertools import repeat

def zip_by_first(*args, fillvalue=None):
    # zip_by_first('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
    # zip_by_first('ABC', 'xyzw', fillvalue='-') --> Ax By Cz
    if not args:
        return
    iterators = [iter(it) for it in args]
    while True:
        values = []
        for i, it in enumerate(iterators):
            try:
                value = next(it)
            except StopIteration:
                if i == 0:
                    return
                iterators[i] = repeat(fillvalue)
                value = fillvalue
            values.append(value)
        yield tuple(values)

您可能可以通过缓存repeat(fillvalue)或类似方式来进行一些小的优化。这种实现的问题在于,它是用Python编写的,而大多数itertools使用速度更快的C实现。您可以通过与Kelly Bundy的答案进行比较来看到其效果。


如果您只是急切地使用“chain”来代替其他所有内容,那么代码会变得更短... - o11c
@o11c。不确定我如何看待chain在这里的应用。 - Mad Physicist
@o11c。我没有使用链,但是代码变得更短了一些。 - Mad Physicist
@MadPhysicist 大多数情况下,iterators = [chain(it, repeat(fillvalue)) for ...],但第一个不要这样做。然后你可以用 yield from zip() 代替整个循环。 - o11c
2
@o11c 最好使用return返回zip,否则你会毫无意义地减缓它的速度。 - Kelly Bundy
2
@o11c 我在我的基准测试中尝试了一下,使用 yield from 将速度从 2.6 毫秒降低到了 3.9 毫秒。 - Kelly Bundy

7

下面是另一种看法,如果目标是可读性强、易于理解的代码:

def zip_first(first, *rest, fillvalue=None):
    rest = [iter(r) for r in rest]
    for x in first:
        yield x, *(next(r, fillvalue) for r in rest)

这里使用了next()的两个参数形式,以返回所有已用完迭代器的填充值。

对于仅包含两个可迭代对象的情况,可以简化成以下方式:

def zip_first(first, second, fillvalue=None):
    second = iter(second)
    for x in first:
        yield x, next(second, fillvalue)

是的,这也可以。 - Mad Physicist
版本号为2的是Jan's应该使用的版本... 乍一看它看起来完全不同,但它只是重新实现了for和两个参数的next。无论如何,我现在已经将您的通用版本包含在我的基准测试中。 - Kelly Bundy

5
让第二个无限,然后只使用常规的zip函数:
from itertools import chain, repeat

a = ['a', 'b', 'c']
b = [1, 2]

b = chain(b, repeat(None))

print(*zip(a, b))

1

zip_longest中仅返回len(a)个元素:

from itertools import zip_longest

def zip_first(a, b):
    z = zip_longest(a, b)
    for i, r in zip(range(len(a)), z):
        yield r

3
如果那不是一个列表怎么办? - Mad Physicist
问题陈述说它是一个列表,示例也是这样。 - Jan Christoph Terasa
可能会从两个迭代器中产生元素,直到第一个参数上出现 StopIteration - Jan Christoph Terasa
2
好的。我制作了一个相当通用的解决方案,仅依赖于第一个参数中的 StopIteration - Mad Physicist
你可以使用 yield from islice(z, len(a)) 代替 for _, r in zip(range(len(a)), z): yield r,其中 islice 是从 itertools 导入的。 - Stef
1
@Stef,实际上最好返回islice(就像mkrieger的答案一样)。将其包装在生成器中会使其变慢。 - Kelly Bundy

1

这个方法可能不太美观,但我会选择它。思路是如果第二个列表比第一个长,我们就将其截短至与第一个相同长度。然后使用zip_longest函数确保结果至少与zip的第一个参数一样长。

import itertools

input1 = [['a', 'b', 'c'], [1, 2]]
input2 = [['a', 'b'], [1, 2, 3]]

zip1 = itertools.zip_longest(input1[0], input1[1][:len(input1[0])])
zip2 = itertools.zip_longest(input2[0], input2[1][:len(input2[0])])

print(list(zip1))
print(list(zip2))

输出:

[('a', 1), ('b', 2), ('c', None)]
[('a', 1), ('b', 2)]

可以使用以下方法将多个列表压缩成zip格式:

import itertools

def zip_first(lists):
    equal_lists = [l[:len(lists[0])] for l in lists]
    return itertools.zip_longest(*equal_lists)

1
我不知道有现成的,但你可以定义自己的方法。
使用 `object()` 作为一个标志物可以确保它始终被测试为唯一值,并且永远不会与 `None` 或任何其他填充值混淆。因此,即使你的可迭代对象中包含 `None`,这个方法也应该能够正常工作。
与 `zip_longest` 类似,它可以接受任意数量的可迭代对象(不一定是两个),并且你可以指定 `fillvalue`。
from itertools import zip_longest

def zip_left(*iterables, fillvalue=None):
    SENTINEL = object()
    
    for first, *others in zip_longest(*iterables, fillvalue=SENTINEL):
        if first is SENTINEL:
            return
        others = [i if i is not SENTINEL else fillvalue for i in others]
        yield (first, *others)


print(list(zip_left(['a', 'b', 'c'], [1, 2])))
print(list(zip_left(['a', 'b'], [1, 2, 3])))

输出:

[('a', 1), ('b', 2), ('c', None)]
[('a', 1), ('b', 2)]

1
如果存在一个合法的 None,该怎么办? - Mad Physicist
@MadPhysicist 谢谢,已编辑。 - CrazyChucky
1
在函数内部生成哨兵,我也会相信它。现在,它是公开可访问的,因此容易出错。除此之外,你的想法是正确的。 - Mad Physicist
1
此外,您应该在此处检查iterables不为空(+1)。解决方案的尴尬之处在于现在您有了zip_longest,并且您的函数基本上对每个项目运行两次相同的检查。 - Mad Physicist
有趣。我一直很好奇那是怎么回事,但它在某个时刻停止了。现在我的机器上又是3.6.9版本了。 - Mad Physicist
显示剩余6条评论

1

对于通用迭代器(或列表),您可以使用以下方法。我们返回一对值,直到在a上遇到StopIteration。如果首先在b上遇到StopIteration,则将None用作第二个值。

def zip_first(a, b):
    ai, bi = iter(a), iter(b)
    while True:
        try:
            aa = next(ai)
        except StopIteration:
            return           
        try:
            bb = next(bi)
        except StopIteration:
            bb = None
        yield aa, bb

2
这段程序可以缩短成: for aa in a: yield aa, next(bi, None) - Kelly Bundy

1
如果输入是列表(或其他可用于len的收集),则可以使用zip_longest并将结果惰性地限制为第一个列表的长度1,通过使用islice:
from itertools import islice, zip_longest

def zip_first(a, b):
    return islice(zip_longest(a, b), len(a))

1这个基本想法来自于Jan Christoph Terasa的答案


1

我不知道,但是

first = ['a', 'b', 'c']
last = [1, 2, 3, 4]
if len(first) < len(last):
    b = list(zip(first, last))
else:
    b = list(zip_longest(first, last))
print(b)

如果有超过两个可迭代对象怎么办?例如 zip(a, b, c) - aminrd

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接