Scala与Python的Spark性能比较

216

我更喜欢Python而不是Scala。但由于Spark是用Scala本地编写的,出于显而易见的原因,我期望在Scala中运行我的代码比Python版本要快。

基于这个假设,我想学习并编写一些常见数据预处理代码的Scala版本,针对1GB数据的SpringLeaf竞赛中选取的数据进行操作,数据摘自Kaggle。为了简单介绍一下该数据(包含1936个维度和145232行),数据由各种类型组成,例如int、float、string、boolean。我正在使用8个核心中的6个进行Spark处理;因此,我使用minPartitions=6,以便每个核心都有一些东西可以处理。

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python 代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala性能 阶段0(38分钟),阶段1(18秒) enter image description here

Python性能 阶段0(11分钟),阶段1(7秒) enter image description here

两者都产生了不同的DAG可视化图表(因此这两张图片显示了Scala(map)和Python(reduceByKey)不同的阶段0函数)。

但本质上,两段代码都试图将数据转换为(dimension_id,值列表的字符串)RDD并保存到磁盘中。输出将用于计算每个维度的各种统计信息。

就性能而言,对于这样的真实数据,Scala代码似乎比Python版本慢4倍。 对我来说好消息是它给了我留在Python的动力,坏消息是我没有完全理解为什么?


11
也许这取决于编码和应用程序,因为我得到了另一个结果,即当对Leibniz公式的十亿个项求和时,Apache Spark Python比Scala慢 - Paul
6
有趣的问题!顺便看一下这里:http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/。拥有更多核心,你就越难区分这些语言之间的差异。 - Markon
你是否考虑过接受现有的答案? - 10465355
1个回答

420

下面可以找到有关代码的原始答案。
首先,您需要区分不同类型的API,每种API都有其自身的性能考虑。
RDD API
(基于JVM编排的纯Python结构)
这是最受Python代码性能和PySpark实现细节影响的组件。虽然Python性能很少会成为问题,但至少有几个因素需要考虑:
  • JVM通信开销。由于几乎所有进出Python执行程序的数据都需要通过套接字和JVM工作线程传递,因此虽然这是相对高效的本地通信,但它仍然不是免费的。
  • 基于进程的执行程序(Python)与基于线程的执行程序(单个JVM多个线程)(Scala)。每个Python执行程序都在自己的进程中运行。作为副作用,它提供比其JVM同类更强的隔离性以及一些关于执行器生命周期的控制,但可能会导致显着更高的内存使用:

    • 解释器内存占用
    • 已加载库的占用空间
    • 广播效率较低(每个进程都需要其自己的广播副本)
  • Python代码本身的性能。通常来说,Scala比Python更快,但这将因任务而异。此外,您有多种选项,包括像Numba这样的JIT、C扩展(Cython)或专业库,如Theano。最后,如果您不使用ML/MLlib(或简单的NumPy堆栈),请考虑使用PyPy作为替代解释器。参见SPARK-3094

  • PySpark配置提供了spark.python.worker.reuse选项,可用于选择每个任务的Python进程复制或重用现有进程。后者似乎有助于避免昂贵的垃圾回收(这更多是一种印象,而不是系统测试的结果),而前者(默认值)在涉及昂贵的广播和导入时是最优的情况。
  • 引用计数被用作CPython中的第一行垃圾回收方法,它与典型的Spark工作负载(流式处理,无引用循环)非常配合,减少了长时间GC暂停的风险。

MLlib

(混合 Python 和 JVM 执行)

基本考虑与以前相同,但还有一些额外的问题。虽然 MLlib 使用的基础结构是纯 Python RDD 对象,但所有算法都是直接使用 Scala 执行的。

这意味着需要将 Python 对象转换为 Scala 对象,反之亦然,增加了内存使用量和一些额外的限制,稍后我们将介绍。

截至目前(Spark 2.x),基于 RDD 的 API 处于维护模式,在 Spark 3.0 中计划删除

DataFrame API 和 Spark ML

(JVM 执行,Python 代码仅限于驱动程序)

对于标准数据处理任务,这可能是最好的选择。由于 Python 代码大多限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。

单个例外是使用逐行Python UDF,它们比其Scala等效部分效率低得多。虽然有一些改进的机会(在Spark 2.0.0中已经有了大量开发),但最大的限制是内部表示(JVM)和Python解释器之间的完全往返。如果可能,您应该优先选择内置表达式的组合(example。Python UDF行为在Spark 2.0.0中得到了改善,但与本地执行相比仍然不够优化。

这个问题通过引入向量化UDF(SPARK-21190和更多扩展)得到了显著改善,它使用Arrow Streaming进行高效的数据交换,零拷贝反序列化。对于大多数应用程序,它们的次要开销可以被忽略。

还要确保避免在DataFramesRDDs之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说数据传输到和从Python解释器。

值得注意的是,Py4J调用具有相当高的延迟。这包括简单的调用,如:

from pyspark.sql.functions import col

col("foo")

通常情况下,这并不重要(开销是固定的且与数据量无关),但对于软实时应用程序,您可以考虑缓存/重用Java包装器。

GraphX和Spark数据集

截至目前为止(Spark 1.6 2.1),两者均未提供PySpark API,因此可以说PySpark比Scala差得多。

实际上,GraphX的开发几乎完全停止了,该项目目前处于维护模式,与之相关的JIRA票据已经关闭,标记为无法修复GraphFrames 库提供了一个具有Python绑定的替代图形处理库。

主观地说,在 Python 中并没有太多的静态类型化的Datasets的空间,即使有,当前的Scala实现也过于简单,无法提供与DataFrame相同的性能优势。

流式处理

从我目前看到的情况来看,我强烈建议使用Scala而不是Python。如果PySpark获得结构化流的支持,这种情况可能会改变,但现在Scala API似乎更加健壮、全面和高效。我的经验非常有限。
Spark 2.x中的结构化流似乎缩小了语言之间的差距,但现在仍处于早期阶段。尽管如此,RDD基础API已被引用为“传统流”在Databricks文档(访问日期2017-03-03)中,因此可以合理地预期进一步的统一努力。
非性能考虑
并非所有Spark功能都通过PySpark API公开。请确保检查您需要的部分是否已经实现,并尝试了解可能的限制。
当您使用MLlib和类似的混合上下文(请参见从任务调用Java/Scala函数)时,这尤其重要。公正地说,PySpark API的某些部分,如mllib.linalg,提供了比Scala更全面的方法集。
The PySpark API与其Scala版本密切相关,因此不完全符合Pythonic。这意味着虽然在语言之间进行映射相当容易,但同时,Python代码可能会更难理解。
相对于纯JVM执行而言,PySpark数据流程相对复杂。要推理或调试PySpark程序要困难得多。此外,至少需要对Scala和JVM有基本的了解。
持续向Dataset API转移,使用冻结RDD API为Python用户带来了机遇和挑战。虽然API的高级部分在Python中更容易暴露,但更高级的功能几乎不可能直接使用。
此外,原生Python函数在SQL世界中仍然是二等公民。希望随着Apache Arrow序列化(当前努力针对数据collection),这将在未来得到改善,但UDF serde是一个长期目标
对于严重依赖Python代码库的项目,纯Python替代方案(如DaskRay)可能是一个有趣的选择。

不必是一对另一个

Spark DataFrame(SQL,Dataset)API提供了一种优雅的方式来在PySpark应用程序中集成Scala / Java代码。您可以使用DataFrames将数据暴露给本机JVM代码并读取结果。我已经在其他地方解释了一些选项,并且您可以在如何在Pyspark中使用Scala类中找到Python-Scala往返的工作示例。
可以通过引入用户定义类型来进一步增强它(请参见如何为Spark SQL定义自定义类型架构?)。

问题在于提供的代码有什么问题

(免责声明:Pythonista观点。很可能我错过了一些Scala技巧)

首先,你的代码中有一个部分根本没有意义。如果你已经使用zipWithIndexenumerate创建了(key, value)对,那么创建字符串再立即拆分它的意义何在?flatMap不会递归地工作,因此您可以简单地产生元组并跳过以下的map

我发现另一个有问题的部分是reduceByKey。一般来说,如果应用聚合函数可以减少必须洗牌的数据量,则reduceByKey很有用。由于您只是连接字符串,因此这里没有任何可获得的利益。忽略低级别的东西,比如引用数量,您必须传输的数据量与groupByKey完全相同。

通常我不会过多纠结这个问题,但据我所知,这是你的Scala代码中的瓶颈。在JVM上连接字符串是一项相当昂贵的操作(例如:Is string concatenation in scala as costly as it is in Java?)。这意味着像这样的代码 _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)(在你的代码中等同于input4.reduceByKey(valsConcat))并不是一个好主意。
如果你想避免使用groupByKey,可以尝试使用aggregateByKeyStringBuilder。类似下面的代码应该能解决问题:
rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

我觉得这件事情值不值得大惊小怪还有待商榷。

考虑到上述情况,我已经按照以下方式重写了你的代码:

Scala

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

结果

local[6] 模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行器有4GB内存,需要(n = 3):

  • Scala - 平均值:250.00秒,标准偏差:12.49
  • Python - 平均值:246.66秒,标准偏差:1.15

我相信大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。仅仅为了好玩,在这台机器上,以下是用 Python 编写的朴素单线程代码,可以在不到一分钟内完成相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

1
这是同一个任务吗?最后的zip是不是有点懒,而且没有保存到文件中? - Dror Speiser

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接