在Spark DataFrame中更改列的可空属性

47

我正在手动创建一个数据框以进行测试。创建它的代码是:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

所以结构看起来像这样:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

我想让这些变量都具有“nullable = true”的属性。如何从一开始就声明或在创建新数据框后切换它?


2
input 应该改为 Input - case 类名应该大写(标题大小写) - Shafique Jamal
https://medium.com/@pradipsudo/explore-nullable-property-of-columns-in-a-spark-data-frame-1d1b7b042adb - Pradip Sodha
从Spark 4.0.0开始,将添加StructType.toNullable,ArrayType.toNullable和MapType.toNullable方法。详情请参阅https://issues.apache.org/jira/browse/SPARK-45661。 - undefined
8个回答

53

答案

使用导入

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

您可以使用

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

直接使用。此外,您可以通过“ pimp my library”库模式使该方法可用(请参见我的SO帖子如何在DataFrame上定义自定义方法的最佳方法?),这样您就可以调用。
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

编辑

替代方案1

使用稍微修改过的setNullableStateOfColumn版本。

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

替代方案2

明确定义模式。(使用反射创建更通用的解决方案)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}

那么没有办法对列进行整体重置吗?如果需要,我可以将列名抓取到列表中并循环遍历该列表。顺便说一句,“pimp my library” 真是太棒了! - J Calbreath
啊,现在我知道你的意思了。你可以通过StructTypecreateDataFrame指定一个模式。我会在我的答案中添加一个编辑。 - Martin Senne
所有这些都是为了实现在任何SQL引擎中通常是默认行为的功能:即一个字段可以包含空值? - Alexander Tronchin-James
我的观察是,这会在源RDD上创建一个逻辑计划,导致额外的处理 - 而且似乎被认为是一个操作,因为我的阶段现在停在createDataFrame行,而不是一些后续处理阶段。 - Stephen

42

如果需要原地更改数据框(dataframe),并且无法重新创建,则可以尝试以下方法:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

Spark 将认为该列可能包含 null,并将 nullability 设置为 true。 同时,您可以使用 udf 将值包装在 Option 中。 即使在流式情况下也可以正常工作。


7
有没有办法在结构化流数据帧中实现反向操作(将列设置为非空)? - redsk
9
好的!PySpark版本的代码是.withColumn("col_name", when(col("col_name").isNotNull(), col("col_name")).otherwise(lit(None))) - AltShift
“otherwise” 似乎不需要。它在这个答案中已经显示了。 - ZygD
2
这应该是被接受的答案。 - JBernardo
即使在流式处理的情况下也能正常工作。 对我来说,只有在使用foreachBatch sink时才能正常工作。对于其他的sink,它似乎可以工作,但实际上并不是这样——检查df模式表明nullable从false变为true,但某种方式列仍然表现得好像nullable=false一样。举个具体的例子,在更改后的列上调用to_avro(不提供模式给to_avro的唯一方法是spark 2.4)会产生直接值的avro字节,而不是表示在avro union中位置的初始int(即nullable=true的行为而不是nullable=false的行为)。 - oskarryn

17

虽然晚了点,但我想提供一个替代方案给那些来这里的人。你可以通过对代码进行以下修改,在一开始就自动使 DataFrameColumn 可为空:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
  .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
    input(Some(1111),Some(1),1001,10.00),
    input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema

这将产生:

root
 |-- id: long (nullable = true)
 |-- var1: integer (nullable = true)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

基本上,如果你使用Some([element])None来声明一个字段为Option,那么该字段将是可空的。否则,该字段将不可空。希望这能帮到你!


10

更紧凑的设置所有列可为空参数的版本

可以使用_.copy(nullable = nullable)代替case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)。然后整个函数可以写成:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}

4

感谢 Martin Senne。 只是一个小补充,在内部结构类型的情况下,您可能需要递归地设置可空,像这样:

def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
    def set(st: StructType): StructType = {
      StructType(st.map {
        case StructField(name, dataType, _, metadata) =>
          val newDataType = dataType match {
            case t: StructType => set(t)
            case _ => dataType
          }
          StructField(name, newDataType, nullable = nullable, metadata)
      })
    }

    df.sqlContext.createDataFrame(df.rdd, set(df.schema))
  }

2

在您的case class中,只需使用java.lang.Integer而不是scala.Int.

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)

3
先生,我们为什么需要优先选择Java的整数类型而不是Spark呢? - BdEngineer

2

我来到这里寻找一个PySpark解决方案,但没有找到,所以我提供了以下内容:

from pyspark.sql.types import StructType, StructField

df = sqlContext.createDataFrame(
[(1, "a", 4), (3, "B", 5)], ("col1", "col2", "col3"))

df.show()
df.schema

+----+----+----+  
|col1|col2|col3|  
+----+----+----+  
|   1|   a|   4|  
|   3|   B|   5|  
+----+----+----+  

StructType(
    List(
        StructField(col1,LongType,true),
        StructField(col2,StringType,true),
        StructField(col3,LongType,true)
    )
)

schema = StructType()
for field in df.schema.fields:
    schema.add(StructField(field.name, field.dataType, False))
newdf = spark.createDataFrame(df.rdd, schema)

newdf.schema
StructType(
    List(
        StructField(col1,LongType,false),
        StructField(col2,StringType,false),
        StructField(col3,LongType,false)
    )
)

0

当您想要在Spark DataFrame中删除一列并创建一个新列时,您可以创建一个可空列,如下所示:

  1. df.withColumn("Employee_Name", when(lit('') == '', '').otherwise(lit(None)))

注意:如果您想创建一个字符串类型的列并使其可为空,则上述代码有效。

  1. df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))

注意:如果您想创建一个整数类型的列并使其可为空,则上述代码有效。


1
问题陈述是更改现有架构而不是创建新列。 - devilpreet
1
有时候删除列并创建一个带有更新模式的新列很容易,我在寻找答案时没有人发表过这种方法。因此,我认为这对某些人会有所帮助。 - Hemanth Vatti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接