Spark:将collect(),take()和show()转换为DF后输出的差异

19

我正在使用Spark 1.5。

我有一个包含30个id的列,我从数据库中加载这些id作为整数

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))

这是numsRDD的输出:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608

接下来,我想生成所有包含三个元素的组合,针对那些ids, 然后将每个组合保存为一个形如: < tripletID: String, triplet: Array(Int)> 的元组并将其转换为一个数据框,我会按照以下方式进行:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","), 
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

当我完成这个步骤后,我会尝试打印一些combinationsDF的内容,以确保一切都按预期进行。所以我尝试了这个:

combinationsDF.show

返回:

+--------------------+--------------------+
|           tripletID|             triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows

显而易见,每个tripletID第一个元素都缺失了。所以,为了确保100%的准确性,我按照以下方式使用take(20)

combinationsDF.take(20).foreach(println(_))

其返回一个更详细的表示,如下所示:

[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]

所以现在我确信tripletID的第一个id因为某些原因被弃用了。但是,如果我尝试使用collect而不是take(20)

combinationsDF.collect.foreach(println(_))

一切都会再次变得好起来(!!!):

[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...

1. 在将组合数组并行化为RDD之前,我已经详尽查询了步骤,一切都没问题。 2. 我也在parallelize应用后立即打印了输出,并且再次一切正常。 3. 问题似乎与将numsRDD转换为DF有关,尽管我已经尽力解决它。 4. 我也无法使用相同的代码片段和模拟数据重现这个问题。

因此,首先:是什么导致了这个问题?其次:如何解决它?


3
能否为我们创建一个本地运行的 [MCVE],这将非常有帮助。 - Yuval Itzchakov
好的,我会尽快尝试。问题是我无法在另一个项目中重现这样的问题... - Christos Hadjinikolis
我添加了原始输入的输出,这应该足以重现问题。 - Christos Hadjinikolis
2个回答

7
  1. df.show() 仅显示内容。

例如:

df.show()
Out[11]: 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
  1. df.collect() shows content and structure/metadata.e.g.

    df.collect()
    Out[11]:
    [Row(age=None, name=u'Michael'),
    Row(age=30, name=u'Andy'),
    Row(age=19, name=u'Justin')]
    
    1. df.take(some number) can be used to shows content and structure/metadata for a limited number of rows for a very large dataset. note it flattens out the data and shows in a single row.

例如,只查看数据框的前两行:

df.take(2)
Out[13]: 
[Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')]

2

我建议您检查原始的numsRDD,看起来可能包含空字符串或空值。以下方法对我有效:

scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","),
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

// Exiting paste mode, now interpreting.

combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

我能想到的唯一问题就是mkString函数不能按照你的预期工作。尝试使用字符串插值(也不需要重新创建List):
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map{case List(a,b,c) => (
        s"$a,$b,$c", 
        Array(a,b,c))}
  .toDF("tripletID","triplet")

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

1
谢谢,但我已经这样做了,并且结果应该是正确的。此外,当我使用 collect 时,问题就“消失了”。 - Christos Hadjinikolis
1
嗯,我会继续查找。你能提供 numsRDD 上的 foreach{println} 吗? - evan.oman
1
尽快完成 - 目前正在通勤中 :-) - Christos Hadjinikolis
很遗憾,我不能在家里尝试这个(刚刚下班)。我需要到公司才能访问集群。我们必须等待。不过我已经使用.count计算了项目数量,它们是30个。我还检查了服务器上表格的内容,看起来没问题...不幸的是,我要等到周一回去上班才能再次尝试。 - Christos Hadjinikolis
1
哈,听起来不错。我想我们都得在那之前等待悬疑的结局。 - evan.oman
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接