Spark创建NumPy数组的RDD的最快方式

3

我的Spark应用程序使用numpy数组的RDD。
目前,我正在从AWS S3读取数据,它以简单的文本文件表示,每行是一个向量,每个元素由空格分隔,例如:

1 2 3
5.1 3.6 2.1
3 0.24 1.333

我正在使用numpy的函数loadtxt()来创建numpy数组。
然而,这种方法似乎非常缓慢,我的应用程序花费了太多时间(我认为)将我的数据集转换为numpy数组。

你能否建议我更好的方法?例如,我应该将我的数据集保留为二进制文件吗?还是我应该用另一种方式创建RDD?

以下是我创建RDD的一些代码:

data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)

readData函数:

 def readPointBatch(iterator):
     return [(np.loadtxt(iterator,dtype=np.float64)]

1
可能导致速度慢的一个原因是您的文件没有分割。将输入文件分割成多个在S3上,并使用通配符匹配所有文件在您的URL中。 - Paul K.
@PaulK。你说的不分割是什么意思?如果我的文件是file1.txt、file2.txt等等,那么URL就会是s3n://bucket//file*.txt? - member555
你在这里并没有给我们足够的信息。是什么让你认为 numpy 在这里真的是个问题?如果你读取文件并只是在空格上分割 data = sc.textFile("s3_url").map(str.split),它会显著地更快吗? - zero323
@zero323 numpy 是一个问题,因为它的 loadtxt 函数非常慢。 - member555
3个回答

4

使用numpy.fromstring进行简单的映射会更符合惯用语并且速度稍快,具体操作如下:

import numpy as np.

path = ...
initial_num_of_partitions = ...

data = (sc.textFile(path, initial_num_of_partitions)
   .map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))

但是忽略这一点,你的方法并没有什么特别的问题。就我所知,在基本配置下,它大约比简单地读取数据慢两倍,比创建虚拟numpy数组稍微慢一些。

因此,看起来问题出在其他地方。可能是集群配置不正确、从S3中获取数据的成本过高,甚至是不切实际的期望值。


众所周知,numpy的loadtxt函数非常慢。与此相比,pandas库中的read_csv()函数超级快速。不幸的是,它不能像np.loadtxt()那样接受Python生成器作为输入。此外,我必须使用mapPartitions,因为我会在每个分区查看整个点块,而不是逐个点。问题是我只能通过一个生成器来访问txt文件。 - member555
与运行任务的成本相比,这个开销微不足道,正如我之前提到的,经过基本的基准测试,它最多比初始化数组慢30%。关于分区 - SparkContext.textFile 不会创建有意义的分区,因此任何只是查看分区的逻辑在设计上都存在问题。 - zero323
在我的服务器上,每个分区处理32MB的时间不到2秒。如果你无法做到低于10秒,那很可能是配置问题。 - zero323
顺便问一下,您的np.fromstring()方法和np.array([float(x) for x in line.split(' ')])有什么区别?因为我觉得这个方法也很慢。 - member555
对于初学者来说,至少需要进行一种类型转换。但正如我之前所说的那样 - 大部分工作似乎是创建numpy数组而不是解析本身。Pandas可能更快,因为它只需要每列一个数组,而不是每行一个数组。 - zero323
显示剩余8条评论

2

在使用Spark时,不应该使用numpy。Spark有自己的数据处理方法,确保您有时非常大的文件不会一次性加载到内存中超出内存限制。您应该按照以下方式使用Spark加载文件:

data = sc.textFile("s3_url", initial_num_of_partitions) \
    .map(lambda row: map(lambda x: float(x), row.split(' ')))

现在,根据你的示例,这将输出如下的RDD
>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]

@edit 关于文件格式和numpy使用的一些建议:

文本文件与CSV、TSV、Parquet或任何您熟悉的格式一样好用。根据Spark文档中有关二进制文件加载的说明,不推荐使用二进制文件:

binaryFiles(path, minPartitions=None)

注意:实验性质

从HDFS、本地文件系统(所有节点都可用)或任何Hadoop支持的文件系统URI中读取一个二进制文件目录作为字节数组。每个文件都被读取为单个记录,并以键值对返回,其中键是每个文件的路径,值是每个文件的内容。

注意:小文件优先,大文件也可以,但可能会导致性能下降。

至于numpy的使用,如果我是你,我肯定会尝试用原生Spark替换任何外部包,例如用pyspark.mlib.random进行随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random


我的Spark算法使用了NumPy矩阵乘法和随机函数。如果按照你的方式表示向量,对我来说会非常不方便。另外,我想问一下,是应该将文件保留为文本文件,还是使用其他格式以获得更好的性能?谢谢。 - member555
文本文件和 CSV、TSV、Parquet 或其他你熟悉的格式一样好用。二进制文件不被推荐使用,根据 Spark 文档中有关二进制文件加载的说明:注意:小文件是首选,大文件也可以,但可能会导致性能不佳。 - Nhor
1
关于numpy的使用,如果我是你,我肯定会尝试用本地的Spark替换任何外部包,例如使用pyspark.mlib.random进行随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random - Nhor
我的文件不是键值对。我知道使用原生的 Spark 更好,但是目前除了使用 numpy 之外,我没有看到其他选项。那么,保留表示向量的 GB 级别大小的文件的最佳方法是什么?你认为文本文件会好吗? - member555
你好Nhor,对我没用。我创建了一个简单的npy文件:matrix = np.array([[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]) np.save("/tmp/test",matrix),然后上传到S3。现在,我正在尝试使用你的代码片段读取它,并得到以下错误:UnicodeEncodeError: 'decimal' codec can't encode character u'\ufffd' in position 0: invalid decimal Unicode string。你有什么想法我做错了什么吗? - Ivan Fernandez

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接