从CSV文件创建Spark数据集

11

我想从一个简单的CSV文件创建一个Spark Dataset。以下是CSV文件的内容:

name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"

以下是创建数据集的代码:

var location = "s3a://path_to_csv"

case class City(name: String, state: String, number_of_people: Long)

val cities = spark.read
  .option("header", "true")
  .option("charset", "UTF8")
  .option("delimiter",",")
  .csv(location)
  .as[City]

这是错误信息:"无法将number_of_people从字符串转换为bigint,因为可能会截断"

Databricks在这篇博客文章中讨论了创建数据集和此特定错误消息。

编码器会急切地检查您的数据是否符合预期的模式, 在您尝试错误地处理TB级数据之前提供有用的错误信息。例如,如果我们尝试使用太小的数据类型,导致转换为对象会截断(即numStudents大于一个字节,其最大值为255),分析器将发出AnalysisException错误。

我正在使用Long类型,所以我没想到会看到这个错误消息。

2个回答

22
使用模式推断:
val cities = spark.read
  .option("inferSchema", "true")
  ...

或提供模式:

val cities = spark.read
  .schema(StructType(Array(StructField("name", StringType), ...)

或者转换类型:

val cities = spark.read
  .option("header", "true")
  .csv(location)
  .withColumn("number_of_people", col("number_of_people").cast(LongType))
  .as[City]

3
使用您的案例类如下: case class City(name: String, state: String, number_of_people: Long), 您只需要一行代码。
private val cityEncoder = Seq(City("", "", 0)).toDS

然后你编写代码

val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]

将会自动工作。

这是官方来源[http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]


2
请问您能否明确说明在创建数据框和数据集的代码中,如何使用这个cityEncoder? - Ozgun
你是否意味着在转换为数据集之前使用编码器指定数据框的模式? - Ozgun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接