Apache Spark 如何处理使用管道符分隔的 CSV 文件

4

我对Apache Spark非常不熟悉,正在尝试使用SchemaRDD处理我的管道分割文本文件。我在我的Mac上使用Scala 10独立安装了Spark 1.5.2。我有一个包含以下代表性数据的CSV文件,我正在尝试根据记录的第一个值(列)将其拆分成4个不同的文件。非常感谢您能提供任何帮助。

1|1.8|20140801T081137|115810740
2|20140714T060000|335|22159892|3657|0.00|||181
2|20140714T061500|335|22159892|3657|0.00|||157
2|20140714T063000|335|22159892|3657|0.00|||156
2|20140714T064500|335|22159892|3657|0.00|||66
2|20140714T070000|335|22159892|3657|0.01|||633
2|20140714T071500|335|22159892|3657|0.01|||1087
3|34|Starz
3|35|VH1
3|36|CSPAN: Cable Satellite Public Affairs Network
3|37|Encore
3|278|CMT: Country Music Television
3|281|Telehit
4|625363|1852400|Matlock|9212|The Divorce
4|625719|1852400|Matlock|16|The Rat Pack
4|625849|1846952|Smallville|43|Calling

3
欢迎来到 Stack Overflow。如果您提供自己的尝试,将有更好的机会获得答案。 - zero323
2个回答

14
注意:您的CSV文件中每行的字段数不相同 - 这不能直接解析为DataFrame。(SchemaRDD已更名为DataFrame。)如果您的csv文件格式正确,可以尝试以下操作:
在启动spark-shell或spark-submit时使用--packages com.databricks:spark-csv_2.10:1.3.0参数,以便轻松解析csv文件(请参见此处)。在Scala中,假定您的csv文件有标题 - 如果是,则更容易引用列:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv")
// assume 1st column has name col1
val df1 = df.filter( df("col1") === 1)  // 1st DataFrame
val df2 = df.filter( df("col1") === 2)  // 2nd DataFrame  etc... 

由于您的文件格式不规范,您需要逐行解析每个不同的行,例如,可以按照以下方式进行:

val lines = sc.textFile("/path/to/file.csv")

case class RowRecord1( col1:Int, col2:Double, col3:String, col4:Int)
def parseRowRecord1( arr:Array[String]) = RowRecord1( arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt)

case class RowRecord2( col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int)
def parseRowRecord2( arr:Array[String]) = RowRecord2( arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt)

val df1 = lines.filter(_.startsWith("1")).map( _.split('|')).map( arr => parseRowRecord1( arr )).toDF
val df2 = lines.filter(_.startsWith("2")).map( _.split('|')).map( arr => parseRowRecord2( arr )).toDF

嗨,KrisP,非常感谢您的帮助。我尝试了您的前几行代码,效果很好!我将尝试您的其余示例,然后根据COL0的值将具有不同列数的文件拆分为具有相同列数的多个文件... - Edward
嗨,KrisP,您是否也知道如何将输出保存到管道分隔文件中?我认为 df2.write.format("com.databricks.spark.csv").save("/Users/temp/parsed1.txt") 命令的输出默认为逗号分隔符并将其分解成多个文件。如果可能的话,我还尝试直接将结果写入Amazon Redshift以使工作流更加流畅。非常感谢您的帮助。 - Edward
3
只有当我使用双引号作为分隔符时,这个方法才对我起作用: .option("delimiter","|") 否则我会收到以下错误信息:java.lang.IllegalArgumentException: Delimiter cannot be more than one character - Brian

7
在PySpark中,命令如下:
df = spark.read.csv("filepath", sep="|")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接