我可以使用spark-csv将以字符串表示的CSV文件读入Apache Spark吗?

15
我知道如何使用spark-csv将CSV文件读入Apache Spark,但我已经将CSV文件表示为字符串,并希望直接将此字符串转换为数据框。这种可能吗?

不,这是不可能的。在Python中,您可以使用Pandas、IO并将结果转换为Spark数据框。 - zero323
@zero323,您能否将您的评论发布为答案呢? - Glennie Helles Sindholt
这个应该标记为Scala吗(不是一个修辞问题)? - undefined
4个回答

20

Spark 3.x 更新 - 实际上更多是为了适应 Java 17 的新的 lines() 函数类型特性。

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
import scala.collection.JavaConverters._

val csvData: Dataset[String] = ("""
                                  |id, date, timedump
                                  |1, "2014/01/01 23:00:01",1499959917383
                                  |2, "2014/11/31 12:40:32",1198138008843
      """.stripMargin.lines.toList.asScala).toDS()

val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()

更新:从Spark 2.2.x开始,终于有了使用Dataset进行正确操作的方法。
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
  """
    |id, date, timedump
    |1, "2014/01/01 23:00:01",1499959917383
    |2, "2014/11/31 12:40:32",1198138008843
  """.stripMargin.lines.toList).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()

旧版的Spark

实际上你是可以的,尽管它使用了库内部的功能并且没有广泛宣传。只需创建并使用自己的CsvParser实例即可。 以下是在Spark 1.6.0和spark-csv_2.10-1.4.0上适用的示例。

    import com.databricks.spark.csv.CsvParser

val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
  .withUseHeader(true)
  .withInferSchema(true)


val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)

4

在Spark 2.2.0中,原先被接受的答案对我并不有效,不过它启示了我需要使用csvData.lines.toList

val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString

val csvList = streamString.lines.toList

spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvList.toDS())
  .as[SomeCaseClass]  

4
您可以使用例如scala-csv将字符串解析为csv格式:

val myCSVdata : Array[List[String]] = myCSVString.split('\n').flatMap(CSVParser.parseLine(_))

在这里,您可以进行更多的处理,对数据进行清洗,验证每行是否解析良好并具有相同数量的字段等...

然后,您可以将其转换为记录的RDD

val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)

在这里,您可以将字符串列表调整为一个case类,以更好地反映csv数据的字段。您可以从此示例中创建Person来获得一些灵感:

https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection

我省略了此步骤。

最后,您可以将RDD转换为DataFrame:

import spark.implicits._ myCSVDataframe = myCSVRDD.toDF()


链接是半破损的(页面锚点)。 - undefined

0
这是我最近遇到并成功解决的PySpark问题。 在这里,我正在使用控制台输出的dataframe.show输出,并使用Spark的CSV API创建一个dataframe。
由于已经有了Scala版本,所以这个PySpark版本与之略有不同。 我将其用于将impala/hive控制台输出转换为CSV,以进行单元测试,非常有用。
我使用了正则表达式...删除+-----+这种字符串。
 re.sub(r'\n[+-]+\n' , '\n', input_data)

import os
import re
import sys

from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
    .appName("String to CSV") \
    .getOrCreate()

# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname           |salary|
|    1|    Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")

#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()

完整的解释在我的文章中。

这段文本与代码完全不一致。这段代码是由ChatGPT(或类似的工具)生成的吗?还是这段代码是从哪里复制过来的? - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接