如何将多行JSON文件转换为单个记录的RDD

3
rdd=sc.textFile(json or xml)
rdd.collect()

[u'{', u'    "glossary": {', u'        "title": "example glossary",', u'\t\t"GlossDiv": {', u'            "title": "S",', u'\t\t\t"GlossList": {', u'                "GlossEntry": {', u'                    "ID": "SGML",', u'\t\t\t\t\t"SortAs": "SGML",', u'\t\t\t\t\t"GlossTerm": "Standard Generalized Markup Language",', u'\t\t\t\t\t"Acronym": "SGML",', u'\t\t\t\t\t"Abbrev": "ISO 8879:1986",', u'\t\t\t\t\t"GlossDef": {', u'                        "para": "A meta-markup language, used to create markup languages such as DocBook.",', u'\t\t\t\t\t\t"GlossSeeAlso": ["GML", "XML"]', u'                    },', u'\t\t\t\t\t"GlossSee": "markup"', u'                }', u'            }', u'        }', u'    }', u'}', u'']

但是我的输出应该是一行中的所有内容。
{"glossary": {"title": "example glossary","GlossDiv": {"title": "S","GlossList":.....}}
3个回答

4
我建议使用Spark SQL JSON,然后调用toJson进行保存(请参见https://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)。
val input = sqlContext.jsonFile(path)
val output = input...
output.toJSON.saveAsTextFile(outputath)

然而,如果您的JSON记录由于多行问题或其他问题而无法被Spark SQL解析,我们可以采用Learning Spark book中的一个示例(作为合著者略有偏见),并修改它以使用wholeTextFiles
case class Person(name: String, lovesPandas: Boolean)
// Read the input and throw away the file names
val input = sc.wholeTextFiles(inputFile).map(_._2)

// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
//     on the driver and have to send data back to the driver to go through the singleton object.
//     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
// (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
//     partition with mapPartitions. Solves serialization and object creation performance hit.
val result = input.mapPartitions(records => {
    // mapper object created on each executor node
    val mapper = new ObjectMapper with ScalaObjectMapper
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)
    // We use flatMap to handle errors
    // by returning an empty list (None) if we encounter an issue and a
    // list with one element if everything is ok (Some(_)).
    records.flatMap(record => {
      try {
        Some(mapper.readValue(record, classOf[ioRecord]))
      } catch {
        case e: Exception => None
      }
    })
  }, true)
result.filter(_.lovesPandas).map(mapper.writeValueAsString(_))
  .saveAsTextFile(outputFile)
}

而在Python中:

from pyspark import SparkContext
import json
import sys

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print "Error usage: LoadJson [sparkmaster] [inputfile] [outputfile]"
        sys.exit(-1)
    master = sys.argv[1]
    inputFile = sys.argv[2]
    outputFile = sys.argv[3]
    sc = SparkContext(master, "LoadJson")
    input = sc.wholeTextFiles(inputFile).map(_._2)
    data = input.flatMap(lambda x: json.loads(x))
    data.filter(lambda x: 'lovesPandas' in x and x['lovesPandas']).map(
        lambda x: json.dumps(x)).saveAsTextFile(outputFile)
    sc.stop()
    print "Done!"

如果我有多行文件,我会收到一个错误。 请确保文件的每一行(或RDD中的每个字符串)都是一个有效的JSON对象或一个JSON对象数组。 - Kumar
抱歉,我会更新答案并提供多行文件的解决方案。对不起。 - Holden
当然,我会尝试今晚完成,接下来几个小时都在工作。 - Holden
1
如果多行JSON文件非常大,我想并行读取它,使用Spark是否可行?(如果我理解正确,这种方法将不得不在单个线程中处理JSON。) - max

1
请使用sc.wholeTextFiles()代替。

0

我遇到了错误,请确保文件的每一行(或RDD中的每个字符串)都是一个有效的JSON对象或JSON对象数组。 - Kumar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接