将基于ID的numpy行转换为列

3
假设我有一个numpy数组,它映射了两种物品类型之间的ID:
[[1, 12],
 [1, 13],
 [1, 14],
 [2, 13],
 [2, 14],
 [3, 11]]

我希望重新排列这个数组,使得新数组中的每一行表示原始数组中匹配同一ID的所有项目。在这里,每一列将代表原始数组中的一个映射,直到新数组中列数的形状限制为止。如果我们想从上面的数组中获得这个结果,并确保我们只有2列,那么我们会得到:

[[12, 13],  #Represents 1 - 14 was not kept as only 2 columns are allowed
 [13, 14],  #Represents 2
 [11,  0]]  #Represents 3 - 0 was used as padding since 3 did not have 2 mappings

这里的朴素方法是使用 for 循环,当遇到原始数组中的行时,填充新数组。是否有更有效的方法使用 numpy 的功能来完成这个任务呢?


非常类似于此问题:https://dev59.com/0VoT5IYBdhLWcg3w8S62,但不是完全重复的。 - AChampion
4个回答

3

这里是一种通用的、大部分基于Numpy的方法:

In [144]: def array_packer(arr):
     ...:     cols = arr.shape[1]
     ...:     ids = arr[:, 0]
     ...:     inds = np.where(np.diff(ids) != 0)[0] + 1
     ...:     sp = np.split(arr[:,1:], inds)
     ...:     result = [np.unique(a[: cols]) if a.shape[0] >= cols else
     ...:                    np.pad(np.unique(a), (0, (cols - 1) * (cols - a.shape[0])), 'constant')
     ...:                 for a in sp]
     ...:     return result
     ...:     
     ...:     

示例:

In [145]: a = np.array([[1, 12, 15, 45],
     ...:  [1, 13, 23, 9],
     ...:  [1, 14, 14, 11],
     ...:  [2, 13, 90, 34],
     ...:  [2, 14, 23, 43],
     ...:  [3, 11, 123, 53]])
     ...:  

In [146]: array_packer(a)
Out[146]: 
[array([ 9, 11, 12, 13, 14, 15, 23, 45,  0,  0,  0]),
 array([13, 14, 23, 34, 43, 90,  0,  0,  0,  0,  0,  0]),
 array([ 11,  53, 123,   0,   0,   0,   0,   0,   0,   0,   0,   0])]

In [147]: a = np.array([[1, 12, 15],
     ...:  [1, 13, 23],
     ...:  [1, 14, 14],
     ...:  [2, 13, 90],
     ...:  [2, 14, 23],
     ...:  [3, 11, 123]])
     ...: 
     ...:   
     ...:  

In [148]: array_packer(a)
Out[148]: 
[array([12, 13, 14, 15, 23]),
 array([13, 14, 23, 90,  0,  0]),
 array([ 11, 123,   0,   0,   0,   0])]

2

对于这个问题,朴素的for循环实际上是一个相当高效的解决方案:

from collections import defaultdict, deque
d = defaultdict(lambda: deque((0, 0), maxlen=2))

%%timeit
for key, val in a:
    d[key].append(val)
4.43 µs ± 29.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

# result: {1: deque([13, 14]), 2: deque([13, 14]), 3: deque([0, 11])}

作为比较,这个帖子中提出的numpy解决方案慢了4倍:

%timeit [[*a[a[:,0]==i,1],0][:2] for i in np.unique(a[:,0])]
18.6 µs ± 336 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

Numpy很棒,我自己也经常使用它,但在这种情况下,我认为它有些繁琐。


不错的算法视野! - Mazdak
1
关于您的基准测试,请使用大数组运行基准测试。另外请注意,您需要用零而不是None填充数组。此外,填充应该从右侧开始。 - Mazdak
我把填充从“无”改成了“0”。感谢@Kasramvd指出这一点! - Piotr
我刚刚在一个列表上进行了10000次计时,使用numpy方法速度更快,我希望有人可以验证一下时间,这样就不会显得有偏见。 - user3483203

2

这里提供一种使用稀疏矩阵的方法:

def pp(map_, maxitems=2):
    M = sparse.csr_matrix((map_[:, 1], map_[:, 0], np.arange(map_.shape[0]+1)))
    M = M.tocsc()
    sizes = np.diff(M.indptr)
    ids, = np.where(sizes)
    D = np.concatenate([M.data, np.zeros((maxitems - 1,), dtype=M.data.dtype)])
    D = np.lib.stride_tricks.as_strided(D, (D.size - maxitems + 1, maxitems),
                                        2 * D.strides)
    result = D[M.indptr[ids]]
    result[np.arange(maxitems) >= sizes[ids, None]] = 0
    return result

使用@crisz的代码计时,但修改了使用较少重复测试数据。此外,我添加了一些“验证”:chrisz和我的解决方案给出相同的答案,另外两个输出不同的格式,因此我无法检查它们。

输入图像描述

代码:

from scipy import sparse
import numpy as np
from collections import defaultdict, deque

def pp(map_, maxitems=2):
    M = sparse.csr_matrix((map_[:, 1], map_[:, 0], np.arange(map_.shape[0]+1)))
    M = M.tocsc()
    sizes = np.diff(M.indptr)
    ids, = np.where(sizes)
    D = np.concatenate([M.data, np.zeros((maxitems - 1,), dtype=M.data.dtype)])
    D = np.lib.stride_tricks.as_strided(D, (D.size - maxitems + 1, maxitems),
                                        2 * D.strides)
    result = D[M.indptr[ids]]
    result[np.arange(maxitems) >= sizes[ids, None]] = 0
    return result

def chrisz(a):
  return [[*a[a[:,0]==i,1],0][:2] for i in np.unique(a[:,0])]

def piotr(a):
  d = defaultdict(lambda: deque((0, 0), maxlen=2))
  for key, val in a:
    d[key].append(val)
  return d

def karams(arr):
  cols = arr.shape[1]
  ids = arr[:, 0]
  inds = np.where(np.diff(ids) != 0)[0] + 1
  sp = np.split(arr[:,1:], inds)
  result = [a[:2].ravel() if a.size >= cols else np.pad(a.ravel(), (0, cols -1 * (cols - a.size)), 'constant')for a in sp]
  return result

def make(nid, ntot):
    return np.c_[np.random.randint(0, nid, (ntot,)),
                 np.random.randint(0, 2**30, (ntot,))]

from timeit import timeit
import pandas as pd
import matplotlib.pyplot as plt

res = pd.DataFrame(
       index=['pp', 'chrisz', 'piotr', 'karams'],
       columns=[10, 50, 100, 500, 1000, 5000, 10000],# 50000],
       dtype=float
)

for c in res.columns:
#        l = np.repeat(np.array([[1, 12],[1, 13],[1, 14],[2, 13],[2, 14],[3, 11]]), c, axis=0)
    l = make(c // 2, c * 6)
    assert np.all(chrisz(l) == pp(l))
    for f in res.index:
        stmt = '{}(l)'.format(f)
        setp = 'from __main__ import l, {}'.format(f)
        res.at[f, c] = timeit(stmt, setp, number=30)

ax = res.div(res.min()).T.plot(loglog=True)
ax.set_xlabel("N");
ax.set_ylabel("time (relative)");

plt.show()

我怀疑我的计时在重复结果上很好,因为它只循环了三次。不错的方法! - user3483203

1

这段内容经过轻微修改,从几乎相同的内容中挑选出两个元素进行填充和选择:

[[*a[a[:,0]==i,1],0][:2] for i in np.unique(a[:,0])]

输出:

[[12, 13], [13, 14], [11, 0]]

如果你想追踪键:
{i:[*a[a[:,0]==i,1],0][:2] for i in np.unique(a[:,0])}

# {1: [12, 13], 2: [13, 14], 3: [11, 0]}

功能

def chrisz(a):
  return [[*a[a[:,0]==i,1],0][:2] for i in np.unique(a[:,0])]

def piotr(a):
  d = defaultdict(lambda: deque((0, 0), maxlen=2))
  for key, val in a:
    d[key].append(val)
  return d

def karams(arr):
  cols = arr.shape[1]
  ids = arr[:, 0]
  inds = np.where(np.diff(ids) != 0)[0] + 1
  sp = np.split(arr[:,1:], inds)
  result = [a[:2].ravel() if a.size >= cols else np.pad(a.ravel(), (0, cols -1 * (cols - a.size)), 'constant')for a in sp]
  return result

Timings

from timeit import timeit
import pandas as pd
import matplotlib.pyplot as plt

res = pd.DataFrame(
       index=['chrisz', 'piotr', 'karams'],
       columns=[10, 50, 100, 500, 1000, 5000, 10000, 50000],
       dtype=float
)

for f in res.index:
    for c i

n res.columns:
        l = np.repeat(np.array([[1, 12],[1, 13],[1, 14],[2, 13],[2, 14],[3, 11]]), c, axis=0)
        stmt = '{}(l)'.format(f)
        setp = 'from __main__ import l, {}'.format(f)
        res.at[f, c] = timeit(stmt, setp, number=30)

ax = res.div(res.min()).T.plot(loglog=True)
ax.set_xlabel("N");
ax.set_ylabel("time (relative)");

plt.show()

结果(显然@Kasramvd是获胜者):

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接