答案
使用导入
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
您可以使用
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
case y: StructField => y
})
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
直接使用。此外,您可以通过“ pimp my library”库模式使该方法可用(请参见我的SO帖子
如何在DataFrame上定义自定义方法的最佳方法?),这样您就可以调用。
val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )
编辑
替代方案1
使用稍微修改过的setNullableStateOfColumn
版本。
def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
})
df.sqlContext.createDataFrame( df.rdd, newSchema )
}
替代方案2
明确定义模式。(使用反射创建更通用的解决方案)
configuredUnitTest("Stackoverflow.") { sparkContext =>
case class Input(id:Long, var1:Int, var2:Int, var3:Double)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
val schema = StructType( Seq (
StructField( "id", LongType, true),
StructField( "var1", IntegerType, true),
StructField( "var2", IntegerType, true),
StructField( "var3", DoubleType, true)
))
val is: List[Input] = List(
Input(1110, 0, 1001,-10.00),
Input(1111, 1, 1001, 10.00),
Input(1111, 0, 1002, 10.00)
)
val rdd: RDD[Input] = sparkContext.parallelize( is )
val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
val inputDF = sqlContext.createDataFrame( rowRDD, schema )
inputDF.printSchema
inputDF.show()
}
input
应该改为Input
- case 类名应该大写(标题大小写) - Shafique Jamal