使用PySpark如何将JSON文件读取为Pyspark数据框架?

5

如何使用PySpark将以下JSON结构读取为spark dataframe?

我的JSON结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}

我已经尝试过以下方法:
df = spark.read.json('simple.json');

我希望将a、b、c作为列,相应的值作为行输出。谢谢。
3个回答

15

JSON字符串变量

如果你有作为变量的JSON字符串,那么你可以执行以下操作:

simple_json = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
rddjson = sc.parallelize([simple_json])
df = sqlContext.read.json(rddjson)

from pyspark.sql import functions as F
df.select(F.explode(df.results).alias('results')).select('results.*').show(truncate=False)

这将为您提供

+---+---+----+
|a  |b  |c   |
+---+---+----+
|1  |2  |name|
|2  |5  |foo |
+---+---+----+

将JSON字符串作为文件中的单独行(SparkContext和SQLContext)

如果您有一个文件中的单独行JSON字符串,则可以像上面那样使用SparkContext将其读入rdd [string],然后其余过程与上述相同。

rddjson = sc.textFile('/home/anahcolus/IdeaProjects/pythonSpark/test.csv')
df = sqlContext.read.json(rddjson)
df.select(F.explode(df['results']).alias('results')).select('results.*').show(truncate=False)

将Json字符串作为文件中的单独行(仅适用于sqlContext)

如果您有一个文件中的Json字符串是单独一行,那么您可以只使用sqlContext。但是,这个过程比较复杂,因为您必须为其创建模式

df = sqlContext.read.text('path to the file')

from pyspark.sql import functions as F
from pyspark.sql import types as T
df = df.select(F.from_json(df.value, T.StructType([T.StructField('results', T.ArrayType(T.StructType([T.StructField('a', T.IntegerType()), T.StructField('b', T.IntegerType()), T.StructField('c', T.StringType())])))])).alias('results'))
df.select(F.explode(df['results.results']).alias('results')).select('results.*').show(truncate=False)

这将会给你与上述结果相同的结果。

我希望这个答案能够有所帮助。


在经过5个小时的尝试后,我只能使用pandas找到解决方案,而无法使用pyspark。感谢您的帮助。不过,使用模式来读取它还是相当棘手的。 - Ajak6

0
!pip install findspark
!pip install pyspark
import findspark
import pyspark
findspark.init()
sc = pyspark.SparkContext.getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

让我们生成自己的JSON数据,这样我们就不必访问文件系统了。

stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

然后创建 DataFrame

swimmersJSON = spark.read.json(stringJSONRDD)

创建临时表
swimmersJSON.createOrReplaceTempView("swimmersJSON")

希望这能对你有所帮助。如果需要完整的代码,你可以参考这个GitHub仓库

0
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import explode

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
json_data = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
json_rdd = sc.parallelize([json_data])
df = spark.read.json(json_rdd)
df =df.withColumn("results", explode(df.results)).select( 
                         col("results.a").alias("a"),
                         col("results.b").alias("b"),
                         col("results.c").alias("c") ) 
df.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接