将 Spark DataSet 行数值映射到新的哈希列中

14

给定以下DataSet值作为inputData

column0 column1 column2 column3
A       88      text    99
Z       12      test    200
T       120     foo     12

在Spark中,计算新的hash列并将其附加到新的DataSet hashedData是一种有效的方法,其中hash被定义为对inputData的每个行值应用MurmurHash3。具体来说,hashedData如下:
column0 column1 column2 column3 hash
A       88      text    99      MurmurHash3.arrayHash(Array("A", 88, "text", 99))
Z       12      test    200     MurmurHash3.arrayHash(Array("Z", 12, "test", 200))
T       120     foo     12      MurmurHash3.arrayHash(Array("T", 120, "foo", 12))

如果需要更多具体信息,请告诉我。

非常感谢您的帮助!

2个回答

21

一种方法是使用withColumn函数:

import org.apache.spark.sql.functions.{col, hash}
dataset.withColumn("hash", hash(dataset.columns.map(col):_*))

1
谢谢!但我认为该行代码将列字符串名称传递给了MurmurHash3函数(即Array("column0", "column1", "column2", "column3"))。我会尝试找到一种在映射函数上下文中提取实际行值的方法。 - Jesús Zazueta
2
@JesúsZazueta 只是想说我认为他的解决方案并没有仅针对列名。此外,有一个很棒的函数可以将多个列的内容合并成一个新列:df.withColumn("concat", concat(df.columns.map(col):_*)) 它们还有其他方法,例如指定连接分隔符 - Lo-Tan

7
原来Spark已经在org.apache.spark.sql.functions包中实现了hash函数。
/**
 * Calculates the hash code of given columns, and returns the result as an int column.
 *
 * @group misc_funcs
 * @since 2.0
 */
@scala.annotation.varargs
def hash(cols: Column*): Column = withExpr {
  new Murmur3Hash(cols.map(_.expr))
}

在我的案例中,应用如下:

import org.apache.spark.sql.functions.{col, hash}

val newDs = typedRows.withColumn("hash", hash(typedRows.columns.map(col): _*))

我真的有很多关于Spark sql的知识需要学习 :(.

在这里留下来,以防其他人需要。谢谢!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接