下面可以找到有关代码的原始答案。
首先,您需要区分不同类型的API,每种API都有其自身的性能考虑。
RDD API
(基于JVM编排的纯Python结构)
这是最受Python代码性能和PySpark实现细节影响的组件。虽然Python性能很少会成为问题,但至少有几个因素需要考虑:
- JVM通信开销。由于几乎所有进出Python执行程序的数据都需要通过套接字和JVM工作线程传递,因此虽然这是相对高效的本地通信,但它仍然不是免费的。
基于进程的执行程序(Python)与基于线程的执行程序(单个JVM多个线程)(Scala)。每个Python执行程序都在自己的进程中运行。作为副作用,它提供比其JVM同类更强的隔离性以及一些关于执行器生命周期的控制,但可能会导致显着更高的内存使用:
- 解释器内存占用
- 已加载库的占用空间
- 广播效率较低(每个进程都需要其自己的广播副本)
Python代码本身的性能。通常来说,Scala比Python更快,但这将因任务而异。此外,您有多种选项,包括像Numba这样的JIT、C扩展(Cython)或专业库,如Theano。最后,如果您不使用ML/MLlib(或简单的NumPy堆栈),请考虑使用PyPy作为替代解释器。参见SPARK-3094。
- PySpark配置提供了
spark.python.worker.reuse
选项,可用于选择每个任务的Python进程复制或重用现有进程。后者似乎有助于避免昂贵的垃圾回收(这更多是一种印象,而不是系统测试的结果),而前者(默认值)在涉及昂贵的广播和导入时是最优的情况。
- 引用计数被用作CPython中的第一行垃圾回收方法,它与典型的Spark工作负载(流式处理,无引用循环)非常配合,减少了长时间GC暂停的风险。
MLlib
(混合 Python 和 JVM 执行)
基本考虑与以前相同,但还有一些额外的问题。虽然 MLlib 使用的基础结构是纯 Python RDD 对象,但所有算法都是直接使用 Scala 执行的。
这意味着需要将 Python 对象转换为 Scala 对象,反之亦然,增加了内存使用量和一些额外的限制,稍后我们将介绍。
截至目前(Spark 2.x),基于 RDD 的 API 处于维护模式,在 Spark 3.0 中计划删除。
DataFrame API 和 Spark ML
(JVM 执行,Python 代码仅限于驱动程序)
对于标准数据处理任务,这可能是最好的选择。由于 Python 代码大多限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。
单个例外是使用逐行Python UDF,它们比其Scala等效部分效率低得多。虽然有一些改进的机会(在Spark 2.0.0中已经有了大量开发),但最大的限制是内部表示(JVM)和Python解释器之间的完全往返。如果可能,您应该优先选择内置表达式的组合(example。Python UDF行为在Spark 2.0.0中得到了改善,但与本地执行相比仍然不够优化。
这个问题通过引入向量化UDF(SPARK-21190和更多扩展)得到了显著改善,它使用Arrow Streaming进行高效的数据交换,零拷贝反序列化。对于大多数应用程序,它们的次要开销可以被忽略。
还要确保避免在DataFrames
和RDDs
之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说数据传输到和从Python解释器。
值得注意的是,Py4J调用具有相当高的延迟。这包括简单的调用,如:
from pyspark.sql.functions import col
col("foo")
通常情况下,这并不重要(开销是固定的且与数据量无关),但对于软实时应用程序,您可以考虑缓存/重用Java包装器。
GraphX和Spark数据集
截至目前为止(Spark 1.6 2.1),两者均未提供PySpark API,因此可以说PySpark比Scala差得多。
实际上,GraphX的开发几乎完全停止了,该项目目前处于维护模式,与之相关的JIRA票据已经关闭,标记为无法修复。 GraphFrames 库提供了一个具有Python绑定的替代图形处理库。
主观地说,在 Python 中并没有太多的静态类型化的Datasets
的空间,即使有,当前的Scala实现也过于简单,无法提供与DataFrame
相同的性能优势。
流式处理
从我目前看到的情况来看,我强烈建议使用Scala而不是Python。如果PySpark获得结构化流的支持,这种情况可能会改变,但现在Scala API似乎更加健壮、全面和高效。我的经验非常有限。
Spark 2.x中的结构化流似乎缩小了语言之间的差距,但现在仍处于早期阶段。尽管如此,RDD基础API已被引用为“传统流”在
Databricks文档(访问日期2017-03-03)中,因此可以合理地预期进一步的统一努力。
非性能考虑
并非所有Spark功能都通过PySpark API公开。请确保检查您需要的部分是否已经实现,并尝试了解可能的限制。
当您使用MLlib和类似的混合上下文(请参见
从任务调用Java/Scala函数)时,这尤其重要。公正地说,PySpark API的某些部分,如
mllib.linalg
,提供了比Scala更全面的方法集。
The PySpark API与其Scala版本密切相关,因此不完全符合Pythonic。这意味着虽然在语言之间进行映射相当容易,但同时,Python代码可能会更难理解。
相对于纯JVM执行而言,PySpark数据流程相对复杂。要推理或调试PySpark程序要困难得多。此外,至少需要对Scala和JVM有基本的了解。
持续向
Dataset
API转移,使用冻结RDD API为Python用户带来了机遇和挑战。虽然API的高级部分在Python中更容易暴露,但更高级的功能几乎不可能直接使用。
此外,原生Python函数在SQL世界中仍然是二等公民。希望随着Apache Arrow序列化(
当前努力针对数据collection
),这将在未来得到改善,但UDF serde是一个
长期目标。
对于严重依赖Python代码库的项目,纯Python替代方案(如
Dask或
Ray)可能是一个有趣的选择。
不必是一对另一个
Spark DataFrame(SQL,Dataset)API提供了一种优雅的方式来在PySpark应用程序中集成Scala / Java代码。您可以使用
DataFrames
将数据暴露给本机JVM代码并读取结果。我已经在
其他地方解释了一些选项,并且您可以在
如何在Pyspark中使用Scala类中找到Python-Scala往返的工作示例。
可以通过引入用户定义类型来进一步增强它(请参见
如何为Spark SQL定义自定义类型架构?)。
问题在于提供的代码有什么问题
(免责声明:Pythonista观点。很可能我错过了一些Scala技巧)
首先,你的代码中有一个部分根本没有意义。如果你已经使用zipWithIndex
或enumerate
创建了(key, value)
对,那么创建字符串再立即拆分它的意义何在?flatMap
不会递归地工作,因此您可以简单地产生元组并跳过以下的map
。
我发现另一个有问题的部分是reduceByKey
。一般来说,如果应用聚合函数可以减少必须洗牌的数据量,则reduceByKey
很有用。由于您只是连接字符串,因此这里没有任何可获得的利益。忽略低级别的东西,比如引用数量,您必须传输的数据量与groupByKey
完全相同。
通常我不会过多纠结这个问题,但据我所知,这是你的Scala代码中的瓶颈。在JVM上连接字符串是一项相当昂贵的操作(例如:
Is string concatenation in scala as costly as it is in Java?)。这意味着像这样的代码
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
(在你的代码中等同于
input4.reduceByKey(valsConcat)
)并不是一个好主意。
如果你想避免使用
groupByKey
,可以尝试使用
aggregateByKey
和
StringBuilder
。类似下面的代码应该能解决问题:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
我觉得这件事情值不值得大惊小怪还有待商榷。
考虑到上述情况,我已经按照以下方式重写了你的代码:
Scala:
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Python:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
结果
在 local[6]
模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器有4GB内存,需要(n = 3):
- Scala - 平均值:250.00秒,标准偏差:12.49
- Python - 平均值:246.66秒,标准偏差:1.15
我相信大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。仅仅为了好玩,在这台机器上,以下是用 Python 编写的朴素单线程代码,可以在不到一分钟内完成相同的任务:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])