Pyspark:将schemaRDD保存为JSON文件

7
我希望找到一种从Apache Spark导出数据到JSON格式的其他工具的方法。我认为一定有一种非常简单的方法来实现这一点。
示例:我有以下JSON文件“jfile.json”:
{"key":value_a1, "key2":value_b1},
{"key":value_a2, "key2":value_b2},
{...}

每行文件都是一个JSON对象。这种类型的文件可以轻松地读入PySpark中。

jsonRDD = jsonFile('jfile.json')

然后调用jsonRDD.collect()方法,结果如下:

[Row(key=value_a1, key2=value_b1),Row(key=value_a2, key2=value_b2)]

现在我想将这些类型的文件保存为纯JSON文件。
我在Spark用户列表中找到了这篇文章:

http://apache-spark-user-list.1001560.n3.nabble.com/Updating-exising-JSON-files-td12211.html

声称使用
RDD.saveAsTextFile(jsonRDD) 

做完这个之后,文本文件看起来像:
Row(key=value_a1, key2=value_b1)
Row(key=value_a2, key2=value_b2)

即jsonRDD只是被简单地写入文件中,我期望在读取Spark用户列表条目后有一种“自动转换”回JSON格式的方式。我的目标是拥有一个看起来像开头提到的'jfile.json'文件。

我是否错过了一种非常明显且易于实现的方法?

我阅读了http://spark.apache.org/docs/latest/programming-guide.html,搜索了谷歌、用户列表和堆栈溢出以获取答案,但几乎所有答案都涉及将JSON读取和解析为Spark。我甚至买了《学习Spark》这本书,但那里的例子(第71页)只导致与上面相同的输出文件。

有人可以帮帮我吗?我觉得我只是在这里缺少一个小链接。

谢谢!

3个回答

5

1
这个很好用,而且只需要一行代码:res.toJSON().saveAsTextFile('/tmp/out/') - Smerity

1

我一直在Spark SQL的SQL控制台上直接使用org.apache.spark.sql.json。虽然这不是最有效的方法,也可能被认为是一种hack,但它可以完成工作。

CREATE TABLE jsonTable (
    key STRING,
    value STRING
)
USING org.apache.spark.sql.json
OPTIONS (
    PATH "destination/path"
);

创建表后,将数据从已注册的临时表或任何其他表中插入。
INSERT OVERWRITE TABLE jsonTable
SELECT * FROM tempTable;

注意:似乎这将启动一个hive map reduce作业,在提供的路径下创建多个文件部分。预计执行速度较慢。

注意:在表创建期间提供的路径位于hdfs上,而不是本地文件系统。

注意:我尚未尝试使用SQLContext.sql将其嵌入脚本中,但可能是可行的。

注意:从jsonTable表选择可能会由于序列化而失败。


1
我看不出有简单的方法来做到这一点。一个解决方案是将SchemaRDD的每个元素转换为String,最终得到一个RDD[String],其中每个元素都是该行的格式化JSON。所以,您需要编写自己的JSON序列化器。这是容易的部分。它可能不是超级快,但应该可以并行工作,并且您已经知道如何将RDD保存到文本文件中。
关键的洞察力在于,您可以通过调用schema方法从SchemaRDD中获取模式的表示形式。然后,由map提供给您的每个Row需要与模式一起递归遍历。实际上,这是平面JSON的串联列表遍历,但您还需要考虑嵌套的JSON。
其余的工作只涉及Python,我不会说,但我有这个Scala工作经验,如果有帮助的话。Scala代码变得复杂的部分实际上并不依赖于深入的Spark知识,因此如果你能理解基本的递归并且了解Python,你应该能够使其工作。对你来说,大部分工作是弄清如何在Python API中处理pyspark.sql.Rowpyspark.sql.StructType

一个警告:我相当确定我的代码在缺失值的情况下还不能正常工作--formatItem方法需要处理空元素。

编辑:Spark 1.2.0中,toJSON方法被引入到SchemaRDD中,使得这个问题变得简单 -- 参见@jegordon的答案。


Spark SQL中JSON支持的介绍 https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html - raittes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接