如何高效地将多个JSON文件读入到Dataframe或JavaRDD中?

18

我可以使用以下代码读取单个json文件,但我需要读取多个json文件并将它们合并成一个数据框。我该如何做到这一点?

DataFrame jsondf = sqlContext.read().json("/home/spark/articles/article.json");

还是有办法将多个json文件读入JavaRDD,然后转换为Dataframe吗?

5个回答

21

要在Spark中读取多个输入,可以使用通配符。无论您是构建数据框架还是RDD,这都是正确的。

context.read().json("/home/spark/articles/*.json")
// or getting json out of s3
context.read().json("s3n://bucket/articles/201510*/*.json")

谢谢,我最终会使用s3,但现在只是本地测试。我已经将zero323的答案标记为正确,所以只能给你点赞了。 - Abu Sulaiman

15
你可以使用完全相同的代码来读取多个JSON文件。只需传递一个目录路径/带通配符的路径,而不是单个文件的路径。 DataFrameReader还提供了以下签名的json方法
json(jsonRDD: JavaRDD[String])

这可以用于解析已加载到JavaRDD中的JSON。


我认为json方法已经过时了,最好使用format("json")代替。你觉得呢? - eliasah
哦,那太好了。谢谢你!我想在发布问题之前需要学会更好地阅读文档。 - Abu Sulaiman

9

spark.read.json函数接受文件列表作为参数。

spark.read.json(List_all_json file)

这将读取列表中的所有文件并返回一个包含这些文件中所有信息的单个数据帧。

这里继续讨论一个旧话题,但是否有人能就给定>1个输入文件可以进行的可分区性/并行化优化发表评论呢?我有一个由10或12个约50GB的文件组成的500GB文件,如果小文件确实有助于进程,则我宁愿保留这些小文件。 - Buzz Moschetti

5
使用pyspark,如果您将所有json文件放在同一个文件夹中,则可以使用 df = spark.read.json('folder_path') 命令。该命令会加载文件夹内的所有json文件。
为了提高读取性能,建议您为dataframe提供模式(schema):
import pyspark.sql.types as T

billing_schema = billing_schema = T.StructType([
  T.StructField('accountId', T.LongType(),True),
  T.StructField('accountName',T.StringType(),True),
  T.StructField('accountOwnerEmail',T.StringType(),True),
  T.StructField('additionalInfo',T.StringType(),True),
  T.StructField('chargesBilledSeparately',T.BooleanType(),True),
  T.StructField('consumedQuantity',T.DoubleType(),True),
  T.StructField('consumedService',T.StringType(),True),
  T.StructField('consumedServiceId',T.LongType(),True),
  T.StructField('cost',T.DoubleType(),True),
  T.StructField('costCenter',T.StringType(),True),
  T.StructField('date',T.StringType(),True),
  T.StructField('departmentId',T.LongType(),True),
  T.StructField('departmentName',T.StringType(),True),
  T.StructField('instanceId',T.StringType(),True),
  T.StructField('location',T.StringType(),True),
  T.StructField('meterCategory',T.StringType(),True),
  T.StructField('meterId',T.StringType(),True),
  T.StructField('meterName',T.StringType(),True),
  T.StructField('meterRegion',T.StringType(),True),
  T.StructField('meterSubCategory',T.StringType(),True),
  T.StructField('offerId',T.StringType(),True),
  T.StructField('partNumber',T.StringType(),True),
  T.StructField('product',T.StringType(),True),
  T.StructField('productId',T.LongType(),True),
  T.StructField('resourceGroup',T.StringType(),True),
  T.StructField('resourceGuid',T.StringType(),True),
  T.StructField('resourceLocation',T.StringType(),True),
  T.StructField('resourceLocationId',T.LongType(),True),
  T.StructField('resourceRate',T.DoubleType(),True),
  T.StructField('serviceAdministratorId',T.StringType(),True),
  T.StructField('serviceInfo1',T.StringType(),True),
  T.StructField('serviceInfo2',T.StringType(),True),
  T.StructField('serviceName',T.StringType(),True),
  T.StructField('serviceTier',T.StringType(),True),
  T.StructField('storeServiceIdentifier',T.StringType(),True),
  T.StructField('subscriptionGuid',T.StringType(),True),
  T.StructField('subscriptionId',T.LongType(),True),
  T.StructField('subscriptionName',T.StringType(),True),
  T.StructField('tags',T.StringType(),True),
  T.StructField('unitOfMeasure',T.StringType(),True)
])

billing_df = spark.read.json('/mnt/billingsources/raw-files/202106/', schema=billing_schema)

在这份文档(https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html)中,我找不到用于加载JSON的第二个模式参数。你确定它存在吗? - Prometheus
1
@Prometheus 这里是文档 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.json.html?highlight=json#pyspark.sql.DataFrameReader.json - Camilo Soto

0

函数json(String... paths)接受可变参数。(文档)

因此,您可以像这样更改您的代码:

sqlContext.read().json(file1, file2, ...)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接