使用数据框中多个其他列的值来添加新列 - Spark/Scala

4
我是新手,对于Spark SQL和DataFrames并不熟悉。我有一个Dataframe,需要基于其他列的值添加一个新的列。我有一个Excel中的嵌套IF公式(用于向新列添加值),转换为编程术语后,大致如下:
if(k =='yes')
{
  if(!(i==''))
  {
    if(diff(max_date, target_date) < 0)
    {
      if(j == '')
      {
        "pending" //the value of the column
      }
      else {
        "approved" //the value of the column
      }
    }
    else{
      "expired" //the value of the column
    }
  }
  else{
    "" //the value should be empty
  }
}
else{
  "" //the value should be empty
} 
j,k是Dataframe中的另外两列。 我知道我们可以使用withColumnwhen基于其他列添加新的列,但我不确定如何使用这种方法实现上述逻辑。 有什么简单/有效的方法可以实现添加新列的上述逻辑吗? 感谢您的帮助。

max_date和target_date从哪里来? - soote
“Max_date”来自我获取数据框的表格。 “Target_date”是三列中的一列,即i、j、k。 - Hemanth
1个回答

6

首先,让我们简化那个if语句:

if(k == "yes" && i.nonEmpty)
  if(maxDate - targetDate < 0)
    if (j.isEmpty) "pending" 
    else "approved"
  else "expired"
else ""

现在有两种主要方法可以实现这一点:
  1. 使用自定义UDF
  2. 使用Spark内置函数:coalescewhenotherwise

自定义UDF

由于您的条件比较复杂,使用第二个方法会比较棘手。使用自定义UDF应该符合您的需求。
def getState(i: String, j: String, k: String, maxDate: Long, targetDate: Long): String =  
  if(k == "yes" && i.nonEmpty)
    if(maxDate - targetDate < 0)
      if (j.isEmpty) "pending" 
      else "approved"
    else "expired"
  else ""

val stateUdf = udf(getState _)
df.withColumn("state", stateUdf($"i",$"j",$"k",lit(0),lit(0)))

只需将lit(0)和lit(0)更改为您的日期代码,就可以为您工作。

使用Spark内置函数

如果您注意到性能问题,可以切换到使用coalesceotherwisewhen,它看起来像这样:

val isApproved = df.withColumn("state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" =!= "", "approved").otherwise(null))
val isPending = isApproved.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" === "", "pending").otherwise(null)))
val isExpired = isPending.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) >= 0), "expired").otherwise(null)))
val finalDf = isExpired.withColumn("state", coalesce($"state", lit("")))

我以前使用过自定义UDF,在处理大型输入源时没有问题,而且自定义UDF可以使代码更易读,特别是在这种情况下。


你好。在你的回答中,使用Spark内置函数,finalDF语句是什么意思?我们已经在不满足条件的地方放置了NULL,对吗? - Hemanth
@Hemanth,coalesce函数返回第一个非空值。因此,如果状态列的当前值在尝试了所有先前的条件后仍为空,则状态列的值将设置为空字符串。 - soote
我实际上可以用 NULL 替换空值。此外,可以检查 i、j 值是否为 NULL 而不是空值。我能否使用下面的代码实现相同的效果? - Hemanth
DF2 = DF.withColumn("state", when(col("k") === 1 && col("i") != "NULL" &&(ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) < 0) && col("j") != "NULL", "approved").when(col("k") === 1 && col("i") != "NULL" && (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) < 0) && col("j") == "NULL", "pending") .when(col("k") === 1 && col("i") != "NULL" && (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(col("i").toString.trim)) >= 0), "expired").otherwise(null)) - Hemanth
我在使用“$“col_name””符号时遇到了问题。我需要导入一个类来使用这个符号吗? - Hemanth
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接