使用时间戳和日期类型将CSV读入Spark Dataframe

25

这是 CDH 配置了 Spark 1.6

我试图将这个假设的 CSV 导入到 Apache Spark 数据框中:

$ hadoop fs -cat test.csv
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a

我使用 databricks-csv jar。

val textData = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

我使用inferSchema为生成的DataFrame创建模式。printSchema()函数为上述代码提供以下输出:

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

C3列的数据类型为字符串。我想将C3的数据类型更改为日期类型。为了将其转换为日期类型,我尝试了以下代码。

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

scala> textData.printSchema
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: timestamp (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+--------------------+---+--------------------+---+
| C0| C1| C2|                  C3| C4|                  C5| C6|
+---+---+---+--------------------+---+--------------------+---+
|  a|  b|  c|2016-09-09 00:00:...|  a|2016-11-11 00:00:...|  a|
|  a|  b|  c|2016-09-10 00:00:...|  a|2016-11-11 00:00:...|  a|
+---+---+---+--------------------+---+--------------------+---+
这段代码和第一个代码块唯一的区别是dateFormat选项行(我使用"yyyy-MM-dd"而不是"yyyy-MM-dd HH:mm:ss")。现在,我得到了C3和C5作为timestamps(C3仍然不是日期)。但对于C5,HH::mm:ss部分被忽略了,并显示为数据中的零。
理想情况下,我希望C3是日期类型,C5是时间戳类型,并且其HH:mm:ss部分不被忽略。 我当前的解决方案如下。我通过从我的DB并行提取数据来创建CSV。我确保将所有日期都作为时间戳提取(不理想)。因此,测试csv现在看起来像这样:
$ hadoop fs -cat new-test.csv
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a

这是我的最终工作代码:

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .schema(finalSchema)
    .option("nullValue", "null")
    .load("new-test.csv")

我在dateFormat中使用完整的时间戳格式"yyyy-MM-dd HH:mm:ss"。我手动创建了finalSchema实例,其中c3是日期,C5是时间戳类型(Spark SQL类型)。我使用schema()函数应用这些模式。输出如下所示:

scala> finalSchema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true))

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: date (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)


scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

有没有更简单或开箱即用的方法将包含日期和时间戳类型的CSV文件解析为Spark数据框架?

相关链接:
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

2个回答

5

这并不是一种优雅的方法,但您可以按照以下方式将时间戳转换为日期(请检查最后一行):

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")
    .withColumn("C4", expr("""to_date(C4)"""))

5

如果在非平凡情况下使用推断选项,它可能不会返回预期结果。如您在InferSchema.scala中所见:

if (field == null || field.isEmpty || field == nullValue) {
  typeSoFar
} else {
  typeSoFar match {
    case NullType => tryParseInteger(field)
    case IntegerType => tryParseInteger(field)
    case LongType => tryParseLong(field)
    case DoubleType => tryParseDouble(field)
    case TimestampType => tryParseTimestamp(field)
    case BooleanType => tryParseBoolean(field)
    case StringType => StringType
    case other: DataType =>
      throw new UnsupportedOperationException(s"Unexpected data type $other")

它只尝试将每个列与时间戳类型匹配,而不是日期类型,因此这种情况下的“开箱即用解决方案”不可行。但通过我的经验,“更容易”的解决方案是直接使用所需类型定义模式,这将避免推断选项设置一种仅针对RDD评估而不是整个数据匹配的类型。你的最终模式是一个高效的解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接