我们是否应该像在训练前并行化序列一样并行化DataFrame?

15
考虑这里给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

假设我们使用sqlContext.read()读取"dataframe"作为数据帧,我们是否仍需要执行类似以下操作的内容?
val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

或者,如果传递了一个dataFrame,fit函数将自动处理并行计算/数据。
敬礼,
2个回答

21

DataFrame是一种分布式数据结构。它既不需要也不能进行并行化。仅使用SparkContext.parallelize方法来分发驱动器内存中存在的本地数据结构。您不应该使用它来分发大型数据集,更不用说重新分发RDDs或更高级别的数据结构(就像您在之前的问题中所做的那样)。

sc.parallelize(trainingData.collect()) 

如果你想在 RDD / DataframeDataset)之间进行转换,请使用专门设计用于此目的的方法:

  1. DataFrameRDD

  2. import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
    val rdd: RDD[Row] = df.rdd
    
  3. RDDDataFrame

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    

5
你可能需要了解RDD和DataFrame之间的区别以及如何在它们之间进行转换:Spark中DataFrame和RDD的区别 回答你的问题:DataFrame已经针对并行执行进行了优化。你不需要做任何事情,可以直接将它传递给任何spark估算器fit()方法。并行执行是在后台处理的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接