我知道如何使用spark-csv将CSV文件读入Apache Spark,但我已经将CSV文件表示为字符串,并希望直接将此字符串转换为数据框。这种可能吗?
Spark 3.x 更新 - 实际上更多是为了适应 Java 17 的新的 lines() 函数类型特性。
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
import scala.collection.JavaConverters._
val csvData: Dataset[String] = ("""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList.asScala).toDS()
val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|id, date, timedump
|1, "2014/01/01 23:00:01",1499959917383
|2, "2014/11/31 12:40:32",1198138008843
""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()
旧版的Spark
实际上你是可以的,尽管它使用了库内部的功能并且没有广泛宣传。只需创建并使用自己的CsvParser实例即可。 以下是在Spark 1.6.0和spark-csv_2.10-1.4.0上适用的示例。
import com.databricks.spark.csv.CsvParser
val csvData = """
|userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
|1,1,user1,m1,l1,mr
|2,2,user2,m2,l2,mr
|3,3,user3,m3,l3,mr
|""".stripMargin
val rdd = sc.parallelize(csvData.lines.toList)
val csvParser = new CsvParser()
.withUseHeader(true)
.withInferSchema(true)
val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
在Spark 2.2.0中,原先被接受的答案对我并不有效,不过它启示了我需要使用csvData.lines.toList
val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString
val csvList = streamString.lines.toList
spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(csvList.toDS())
.as[SomeCaseClass]
val myCSVdata : Array[List[String]] =
myCSVString.split('\n').flatMap(CSVParser.parseLine(_))
在这里,您可以进行更多的处理,对数据进行清洗,验证每行是否解析良好并具有相同数量的字段等...
然后,您可以将其转换为记录的RDD
:
val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)
在这里,您可以将字符串列表调整为一个case类,以更好地反映csv数据的字段。您可以从此示例中创建Person
来获得一些灵感:
我省略了此步骤。
最后,您可以将RDD转换为DataFrame:
import spark.implicits._
myCSVDataframe = myCSVRDD.toDF()
re.sub(r'\n[+-]+\n' , '\n', input_data)
import os
import re
import sys
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.appName("String to CSV") \
.getOrCreate()
# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname |salary|
| 1| Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")
#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()