以编程方式向Spark DataFrame添加多个列

3
我正在使用scala编写spark程序。
我有一个Dataframe,包含3列: ID、Time和RawHexdata。我有一个用户定义的函数,它将RawHexData扩展为X个新列。需要说明的是,对于每一行,X是相同的(列不变)。但是,在接收到第一条数据之前,我不知道这些列的内容。但是,一旦我得到了表头,我就可以推断出来。
我想要一个第二个Dataframe,包含以下列:Id、Time、RawHexData、NewCol1、...、NewCol3。
我能想到的“最简单”的方法是: 1. 将每一行反序列化为json(每种数据类型都可以在这里进行序列化) 2. 添加我的新列, 3. 从修改后的json中反序列化一个新的Dataframe,
然而,这似乎是一种浪费,因为它涉及到2个昂贵且冗余的json序列化步骤。我正在寻找一种更清晰的模式。
使用case类似乎是一个坏主意,因为我事先不知道列数或列名。

1
你能提供更多细节吗?例如RawHexdata中包含的数据。 - zero323
只有在满足一些条件之后,您才能应用.withColumn()函数。 - TheMP
Rawhexdata是由一批嵌入式设备发送的巨大二进制块。它包含数据,将被反序列化为其他平面数值数据:双精度浮点数,整数,复数等。我希望以后能够让分析员使用Sparksql查询这些数据。但是,当数据在二进制块中时,这是不可能的。因此,我编写了一个UDF“parseblob”,它接受一个二进制块并返回一个地图/JSON对象(我可以更改返回类型以适合解决方案)。我希望该映射的内容成为另一个表中的列,其中每行与原始原始数据相关联。 - eshalev
@niemand,withcolumn一次只允许一个列。有没有办法在不为每个添加的列重新解析整个blob的情况下使用withcolumn?(比如说我想添加3列)。如果可以的话,我可以轻松地通过重复调用withcolumn来添加多个列的函数。然而,我能想到的每个withcolumn的语法都需要对每行数据进行多次重新解析。我对Scala并不是很熟悉,也许有什么方法... - eshalev
2个回答

2
你可以通过操作行 RDD 来动态扩展你的 DataFrame,可以通过调用 dataFrame.rdd 获取。有了 Row 实例,你可以访问 RawHexdata 列并解析其中包含的数据。将新解析的列添加到结果 Row 中,就几乎解决了问题。将 RDD[Row] 转换回 DataFrame 唯一需要的是为新列生成模式数据。你可以通过在驱动程序上收集单个 RawHexdata 值,然后提取列类型来完成此操作。
以下代码示例说明了这种方法。
object App {

  case class Person(name: String, age: Int)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
    val dataFrame = input.df

    dataFrame.show()

    // create the extended rows RDD
    val rowRDD = dataFrame.rdd.map{
      row =>
        val blob = row(1).asInstanceOf[Int]
        val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
        Row.fromSeq(row.toSeq.init ++ newColumns)
    }

    val schema = dataFrame.schema

    // we know that the new columns are all integers
    val newColumns = StructType{
      Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
    }

    val newSchema = StructType(schema.init ++ newColumns)

    val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)

    newDataFrame.show()
  }
}

谢谢,虽然我不知道每个具体数字值的类型。但我可以添加一个“switch”并构建Seq函数。 - eshalev
完全正确,@eshalev。假设你的所有RawHexdata对象都包含相同的列,你可以收集一个RawHexdata对象并计算结果列的数据类型。 - Till Rohrmann

2

SELECT是你在不返回RDD的情况下解决它的好帮手。

case class Entry(Id: String, Time: Long)

val entries = Seq(
  Entry("x1", 100L),
  Entry("x2", 200L)
)

val newColumns = Seq("NC1", "NC2", "NC3")

val df = spark.createDataFrame(entries)
  .select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*)

df.show(false)

+---+----+----+----+----+
|Id |Time|NC1 |NC2 |NC3 |
+---+----+----+----+----+
|x1 |100 |null|null|null|
|x2 |200 |null|null|null|
+---+----+----+----+----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接