Spark:不聚合的转置DataFrame

13

我查看了很多在线问题,但它们似乎不能实现我想要的目标。

我正在使用Scala的Apache Spark 2.0.2。

我有一个数据框:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

我希望将其转换为

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

我尝试使用 pivot(),但是我无法得到正确的答案。 最终我通过循环遍历我的 val{x} 列,并按照以下方式进行旋转,但这证明非常缓慢。

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

然后对于每次迭代的 val{x} 使用 union() 方法与我的第一个数据框相结合。

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

有没有一种更有效的方法来进行转置,而我不想对数据进行聚合?

谢谢:)

4个回答

19

很遗憾,以下情况不适合使用Spark DataFrame:

  • 数据量不足以证明使用DataFrame的必要性。
  • 数据转置不可行。

需要记住的是,DataFrame在Spark中实现为分布式的行集合,每一行都存储和处理在一个节点上。

您可以通过使用pivot来在DataFrame上进行数据转置:

val kv = explode(array(df.columns.tail.map { 
  c => struct(lit(c).alias("k"), col(c).alias("v")) 
}: _*))

df
  .withColumn("kv", kv)
  .select($"segment_id", $"kv.k", $"kv.v")
  .groupBy($"k")
  .pivot("segment_id")
  .agg(first($"v"))
  .orderBy($"k")
  .withColumnRenamed("k", "vals")

但这只是一个没有实际应用的玩具代码。在实践中,它并不比收集数据更好:

val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
  case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
  }
}

val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
  StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)

spark.createDataFrame(sc.parallelize(rows), schema)

对于定义如下的DataFrame

val df = Seq(
  (1, 100, 0, 0, 0, 0, 0),
  (2, 0, 50, 0, 0, 20, 0),
  (3, 0, 0, 0, 0, 0, 0),
  (4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")

两者都能给你想要的结果:

+----+---+---+---+---+
|vals|  1|  2|  3|  4|
+----+---+---+---+---+
|val1|100|  0|  0|  0|
|val2|  0| 50|  0|  0|
|val3|  0|  0|  0|  0|
|val4|  0|  0|  0|  0|
|val5|  0| 20|  0|  0|
|val6|  0|  0|  0|  0|
+----+---+---+---+---+

话虽如此,如果您需要在分布式数据结构上进行高效的转置操作,您需要寻找其他解决方案。有许多数据结构,包括核心的CoordinateMatrixBlockMatrix,可以将数据分布在两个维度上,并且可以进行转置。


我是Scala和Spark的初学者,似乎在struct和explode方面存在错误。这是正确的吗? - Shuai Liu
@ShuaiLiu 你需要 import org.apache.spark.sql.functions._ - 7kemZmani

1
在Python中,可以简单地完成这项操作。我通常通过将Spark DataFrame转换为Pandas并使用transpose函数来实现。
```python spark_df.toPandas().T ```

由于当前的编写方式,您的答案不太清晰。请【编辑】以添加其他详细信息,以帮助他人了解如何回答所提出的问题。您可以在帮助中心找到有关如何编写良好答案的更多信息。 - Community
2
toPandas().T 可能适用于小型数据框,但对于较大的数据框,很可能会出现内存溢出错误。 - Victoria
这对于小的“DataFrame”来说是相当合理的方法。已点赞。祝贺您获得第一个积分! - WestCoastProjects

1

以下是Pyspark的解决方案: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html

以下是您问题的解决代码:

步骤1:选择列

d = df.select('val1','val2','val3','val4','val5','val6','segment_id')

这段代码可以形成如下的数据框架:
+----------+-----+----+----+----+----+----+
| val1|val2|val3|val4|val5|val6|segment_id
+----------+-----+----+----+----+----+----+
|  100|   0|   0|   0|   0|   0|    1     |   
|    0|  50|   0|   0|  20|   0|    2     |
|    0|   0|   0|   0|   0|   0|    3     |
|    0|   0|   0|   0|   0|   0|    4     |
+----------+-----+----+----+----+----+----+

步骤二:转置整个表格。
 d_transposed = d.T.sort_index() 

这段代码可以生成如下的数据框架:
+----+-----+----+----+----+----+-
|segment_id|    1|   2|   3|   4|
+----+-----+----+----+----+----+-
|val1      |  100|   0|   0|   0|
|val2      |    0|  50|   0|   0|
|val3      |    0|   0|   0|   0|
|val4      |    0|   0|   0|   0|
|val5      |    0|  20|   0|   0|
|val6      |    0|   0|   0|   0|
+----+-----+----+----+----+----+-

第三步:您需要将segment_id重命名为vals
d_transposed.withColumnRenamed("segment_id","vals")

+----+-----+----+----+----+----+-
|vals      |    1|   2|   3|   4|
+----+-----+----+----+----+----+-
|val1      |  100|   0|   0|   0|
|val2      |    0|  50|   0|   0|
|val3      |    0|   0|   0|   0|
|val4      |    0|   0|   0|   0|
|val5      |    0|  20|   0|   0|
|val6      |    0|   0|   0|   0|
+----+-----+----+----+----+----+-

以下是完整代码:

 d = df.select('val1','val2','val3','val4','val5','val6','segment_id')
 d_transposed = d.T.sort_index()
 d_transposed.withColumnRenamed("segment_id","vals")

-1

这应该是一个完美的解决方案。

val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0))
val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
df1.show()

val schema = df1.schema

val df2 = df1.flatMap(row => {
  val metric = row.getInt(0)
  (1 until row.size).map(i => {
    (metric, schema(i).name, row.getInt(i))
  })
})

val df3 = df2.toDF("metric", "vals", "value")
df3.show()
import org.apache.spark.sql.functions._

val df4 = df3.groupBy("vals").pivot("metric").agg(first("value"))
df4.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接