如何在CSV中使用双竖线作为分隔符?

7

Spark 1.5和Scala 2.10.6

我有一个数据文件,使用“¦¦”作为分隔符。我很难解析它以创建数据框架。可以使用多个分隔符来创建数据框架吗?代码可以处理单个断开的管道,但无法处理多个分隔符。

我的代码:

val customSchema_1 = StructType(Array(
    StructField("ID", StringType, true), 
    StructField("FILLER", StringType, true), 
    StructField("CODE", StringType, true)));

val df_1 = sqlContext.read
    .format("com.databricks.spark.csv")
    .schema(customSchema_1)
    .option("delimiter", "¦¦")
    .load("example.txt")

样例文件:

12345¦¦  ¦¦10

1
你尝试过这个代码 ("\|\|") 吗?请查看链接:http://stackoverflow.com/a/36949918/647053 - Ram Ghadiyaram
我建议将其转换为以下代码:val text = sc.textFile("yourcsv.csv") val words = text.map(lines => lines.split("\\|\\|"))然后再用单个竖线构建CSV,并按照你的方法继续处理。 - Ram Ghadiyaram
@RamGhadiyaram 如果 OP 的数据包含任何双管道,那可能会有问题,我建议尝试在 spark.csv 分隔符选项上使用转义字符。 - evan.oman
@RamGhadiyaram 感谢您的建议!我尝试了.option("delimiter", "\¦\¦"),但是出现了不支持的特殊字符错误。 - SFatima
@RamGhadiyaram ¦ 不等同于 | - OneCricketeer
我只是举个例子。 - Ram Ghadiyaram
4个回答

6
我遇到了这个问题并找到了一个好的解决方案,我正在使用Spark 2.3,我有一种感觉它应该适用于所有Spark 2.2+,但尚未测试。它的工作原理是将|| 替换为tab,然后内置的CSV可以使用Dataset[String]。我使用制表符是因为我的数据中有逗号。
var df = spark.sqlContext.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("delimiter", "\t")
  .csv(spark.sqlContext.read.textFile("filename")
      .map(line => line.split("\\|\\|").mkString("\t")))

希望这可以帮助其他人。
编辑:
自 Spark 3.0.1 起,此功能可直接使用。
示例:
val ds = List("name||id", "foo||12", "brian||34", """"cray||name"||123""", "cray||name||123").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

val csv = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", "||").csv(ds)
csv: org.apache.spark.sql.DataFrame = [name: string, id: string]

csv.show
+----------+----+
|      name|  id|
+----------+----+
|       foo|  12|
|     brian|  34|
|cray||name| 123|
|      cray|name|
+----------+----+

谢谢,这个完美地运行了(应该有更多的赞!) - Kumar Vaibhav

5
所以这里实际上发出的错误是:
java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ¦¦

文档证实了这个限制条件,我检查了Spark 2.0的csv读取器,发现它有相同的要求。
基于所有这些考虑,如果你的数据足够简单,而不会包含“¦¦”这样的条目,那么我建议你这样加载数据:
scala> :pa
// Entering paste mode (ctrl-D to finish)
val customSchema_1 = StructType(Array(
    StructField("ID", StringType, true), 
    StructField("FILLER", StringType, true), 
    StructField("CODE", StringType, true)));

// Exiting paste mode, now interpreting.
customSchema_1: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,true), StructField(FILLER,StringType,true), StructField(CODE,StringType,true))

scala> val rawData = sc.textFile("example.txt")
rawData: org.apache.spark.rdd.RDD[String] = example.txt MapPartitionsRDD[1] at textFile at <console>:31

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val rowRDD = rawData.map(line => Row.fromSeq(line.split("¦¦")))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:34

scala> val df = sqlContext.createDataFrame(rowRDD, customSchema_1)
df: org.apache.spark.sql.DataFrame = [ID: string, FILLER: string, CODE: string]

scala> df.show
+-----+------+----+
|   ID|FILLER|CODE|
+-----+------+----+
|12345|      |  10|
+-----+------+----+

如何在Spark 2中保存为CSV文件时添加|^|分隔符 - Sudarshan kumar
1
有时我们需要在不知道列名的情况下加载数据。我认为上述方法在这种情况下会失败。 - donald

1
我们尝试以以下方式读取具有自定义分隔符和自定义列名称的数据框:
# Hold new column names saparately
headers ="JC_^!~_*>Year_^!~_*>Date_^!~_*>Service_Type^!~_*>KMs_Run^!~_*>

# '^!~_*>' This is field delimiter, so split string
head = headers.split("^!~_*>")

## Below command splits the S3 file with custom delimiter and converts into Dataframe
df = sc.textFile("s3://S3_Path/sample.txt").map(lambda x: x.split("^!~_*>")).toDF(head)

将头部作为参数传递给toDF(),为从具有自定义分隔符的文本文件创建的数据帧分配新列名。
希望这可以帮助。

0
从Spark2.8及以上版本开始,增加了对多字符分隔符的支持。 https://issues.apache.org/jira/browse/SPARK-24540 上述由@lockwobr提出的解决方案适用于Scala。如果您在Spark 2.8以下版本中工作,并且正在寻找PySpark的解决方案,您可以参考以下内容。
ratings_schema = StructType([
                                  StructField("user_id", StringType(), False)
                                , StructField("movie_id", StringType(), False)
                                , StructField("rating", StringType(), False)
                                , StructField("rating_timestamp", StringType(), True)
                                ])

    #movies_df = spark.read.csv("ratings.dat", header=False, sep="::", schema=ratings_schema)

    movies_df = spark.createDataFrame(
            spark.read.text("ratings.dat").rdd.map(lambda line: line[0].split("::")),
            ratings_schema)

我提供了一个例子,但你可以根据自己的逻辑进行修改。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接