PyData Blaze:它是否允许并行处理?

8
我正在寻求并行化numpy或pandas操作。为此,我一直在研究pydata的blaze。我的理解是无缝并行化是它的主要卖点。
不幸的是,我无法找到可以在多个核心上运行的操作。目前blaze中是否可用并行处理或仅为陈述目的?我做错了什么吗?我正在使用blaze v0.6.5。
我希望并行化的一个函数示例:(去重一个太大而无法放入内存的pytables列)
import pandas as pd
import blaze as bz
def f1():
    counter = 0
    groups = pd.DataFrame(columns=['name'])
    t = bz.TableSymbol('t', '{name: string}')
    e = bz.distinct(t)
    for chunk in store.select('my_names', columns=['name'],
                              chunksize=1e5):
        counter += 1
        print('processing chunk %d' % counter)
        groups = pd.concat([groups, chunk])
        groups = bz.compute(e, groups)

编辑1

我在按照菲利普的示例时遇到了问题:

In [1]: from blaze import Data, compute

In [2]: d = Data('test.bcolz')

In [3]: d.head(5)
Out[3]: <repr(<blaze.expr.collections.Head at 0x7b5e300>) failed: NotImplementedError: Don't know how to compute:
expr: _1.head(5).head(11)
data: {_1: ctable((8769257,), [('index', '<i8'), ('date', 'S10'), ('accessDate', 'S26')])
  nbytes: 367.97 MB; cbytes: 35.65 MB; ratio: 10.32
  cparams := cparams(clevel=5, shuffle=True, cname='blosclz')
  rootdir := 'test.bcolz'
[(0L, '2014-12-12', '2014-12-14T17:39:19.716000')
 (1L, '2014-12-11', '2014-12-14T17:39:19.716000')
 (2L, '2014-12-10', '2014-12-14T17:39:19.716000') ...,
 (1767L, '2009-11-11', '2014-12-15T13:32:39.906000')
 (1768L, '2009-11-10', '2014-12-15T13:32:39.906000')
 (1769L, '2009-11-09', '2014-12-15T13:32:39.906000')]}>

我的环境:

C:\Anaconda>conda list blaze
# packages in environment at C:\Anaconda:
#
blaze                     0.6.8               np19py27_69

但请注意,Blaze似乎报告了一个错误的版本:
In [5]: import blaze

In [6]: blaze.__version__
Out[6]: '0.6.7'

对于其他数据源,blaze似乎可以工作:

In [6]: d = Data([1,2,2,2,3,4,4,4,5,6])

In [7]: d.head(5)
Out[7]:
   _2
0   1
1   2
2   2
3   2
4   3

In [16]: list(compute(d._2.distinct()))
Out[16]: [1, 2, 3, 4, 5, 6]

关于评论,您可以尝试从源代码更新吗?pip install git+https://github.com/ContinuumIO/blaze --upgrade - MRocklin
@ARF,请不要忘记接受我的答案,如果您愿意的话。 - Phillip Cloud
1个回答

8

注意:下面的示例需要最新版本的blaze,您可以通过以下方式获取:

conda install -c blaze blaze

你还需要最新版本的新兴 into 项目。你需要从 master 安装 into,可以使用以下命令:
pip install git+git://github.com/ContinuumIO/into.git

你无法使用任意后端进行“无缝”并行处理,但bcolz后端以优美的方式支持并行处理。这里有一个关于纽约市出租车行程/费用数据集的示例。 注意:我将行程和费用数据集合并成了一个数据集。该数据集中有173,179,759行数据。
In [28]: from blaze import Data, compute

In [29]: ls -d *.bcolz
all.bcolz/  fare.bcolz/ trip.bcolz/

In [30]: d = Data('all.bcolz')

In [31]: d.head(5)
Out[31]:
                          medallion                      hack_license  \
0  89D227B655E5C82AECF13C3F540D4CF4  BA96DE419E711691B9445D6A6307C170
1  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
2  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
3  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310
4  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310

  vendor_id  rate_code store_and_fwd_flag     pickup_datetime  \
0       CMT          1                  N 2013-01-01 15:11:48
1       CMT          1                  N 2013-01-06 00:18:35
2       CMT          1                  N 2013-01-05 18:49:41
3       CMT          1                  N 2013-01-07 23:54:15
4       CMT          1                  N 2013-01-07 23:25:03

     dropoff_datetime  passenger_count  trip_time_in_secs  trip_distance  \
0 2013-01-01 15:18:10                4                382            1.0
1 2013-01-06 00:22:54                1                259            1.5
2 2013-01-05 18:54:23                1                282            1.1
3 2013-01-07 23:58:20                2                244            0.7
4 2013-01-07 23:34:24                1                560            2.1

     ...     pickup_latitude  dropoff_longitude  dropoff_latitude  \
0    ...           40.757977         -73.989838         40.751171
1    ...           40.731781         -73.994499         40.750660
2    ...           40.737770         -74.009834         40.726002
3    ...           40.759945         -73.984734         40.759388
4    ...           40.748528         -74.002586         40.747868

   tolls_amount  tip_amount  total_amount  mta_tax  fare_amount  payment_type  \
0             0           0           7.0      0.5          6.5           CSH
1             0           0           7.0      0.5          6.0           CSH
2             0           0           7.0      0.5          5.5           CSH
3             0           0           6.0      0.5          5.0           CSH
4             0           0          10.5      0.5          9.5           CSH

  surcharge
0       0.0
1       0.5
2       1.0
3       0.5
4       0.5

[5 rows x 21 columns]

为了增加基于进程的并行性,我们引入了stdlib模块中的类,并将实例的方法作为关键字参数传递给函数。请注意保留HTML标签。
In [32]: from multiprocessing import Pool

In [33]: p = Pool()

In [34]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct())
1 loops, best of 1: 1min per loop

In [35]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct(), map=p.map)
1 loops, best of 1: 16.2 s per loop

因此,多加一行代码大约可以提高3倍的速度。请注意,这是一个字符串列,与其他类型相比往往效率非常低。在整数列上计算的distinct表达式在多核处理器下约1秒钟完成(与3秒钟相比),因此运行时间的改进大约相同。
In [38]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct())
1 loops, best of 1: 3.33 s per loop

In [39]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct(), map=p.map)
1 loops, best of 1: 1.01 s per loop

谢谢Phillip。那真的帮了我很多!我在文档和Continuum博客的各个部分中搜索,但没有找到关于并行处理的具体内容。你是否有一个概述,哪些后端支持并行处理,哪些不支持?如果没有,我会去看代码。无论如何,感谢你! - ARF
目前的bcolz、h5py(用于更多的nd-array操作)和CSV文件集合都可以从pmap函数中受益。当然,如果Pandas释放了GIL,我们就可以加速使用DataFrames的任何操作(向Phil和Jeff伸出肘部) :) - MRocklin
@MRocklin 太好了!bcolz、h5py和CSV对我来说已经足够了...不幸的是,我一直收到NotImplementedError: Don't know how to compute(至少在bcolz中是这样)。请参见OP中的Edit 1。我可以再次请求你们两位指点我正确的方向吗?我没有使用into,而是本地创建了bcolz。有可能是这个原因吗? - ARF
本地执行是理想的方式,使用虚拟环境只是为了方便。我建议按照原始问题中的评论更新源代码。更新后问题是否仍然存在? - MRocklin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接