在Spark DataFrame中添加一个列并为其计算一个值

4
我有一个包含经纬度列的CSV文档要加载到SQLContext中。
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter","\t").schema(customSchema).load(inputFile);

CSV示例

metro_code, resolved_lat, resolved_lon
602, 40.7201, -73.2001

我正在尝试找出最佳的方法来添加新列并计算每行的GeoHex。使用geohex包哈希纬度和经度非常容易。我认为需要运行parallelize方法或者像一些例子中那样将函数传递给withColumn。


1个回答

12

将所需的函数用UDF包装起来即可解决问题:

import org.apache.spark.sql.functions.udf
import org.geohex.geohex4j.GeoHex

val df = sc.parallelize(Seq(
  (Some(602), 40.7201, -73.2001), (None, 5.7805, 139.5703)
)).toDF("metro_code", "resolved_lat", "resolved_lon")

def geoEncode(level: Int) = udf(
  (lat: Double, long: Double) => GeoHex.encode(lat, long, level))

df.withColumn("code", geoEncode(9)($"resolved_lat", $"resolved_lon")).show
// +----------+------------+------------+-----------+
// |metro_code|resolved_lat|resolved_lon|       code|
// +----------+------------+------------+-----------+
// |       602|     40.7201|    -73.2001|PF384076026|
// |      null|      5.7805|    139.5703|PR081331784|
// +----------+------------+------------+-----------+

我无法解决这个错误 value $ is not a member of StringContext,谷歌搜索也没有找到任何有用的信息。我需要查找 Scala 文档中关于 $ 的内容。 - jspooner
1
这对我有用:df.withColumn("gh11", geoEncode(11)(df("resolved_lat"),df("resolved_lon"))).show - jspooner
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Iain
你需要使用 sqlContext 来进行 toDF 操作,所以它在内部是隐式存在的 :) 关于 StringToColumn$ 方法,但假设这只是一个惯例,就像 sc 代表 SparkContext 一样。 - zero323
@zero323 非常感谢! - Anbarasu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接