如何在Spark 2 Scala中将行转换为JSON

14

有没有一种简单的方法将给定的Row对象转换为JSON?

找到了关于将整个Dataframe转换为JSON输出的内容: Spark Row to JSON

但我只想将一个Row转换为JSON。以下是我尝试做的伪代码。

更精确地说,我在Dataframe中以JSON格式作为输入读取数据。 我正在生成一个新的输出,主要基于列,但是其中包括一个JSON字段,用于存储所有不适合列的信息。

我的问题是:编写此函数(convertRowToJson())的最简单方法是什么?

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom的解决方案:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

仅在行只有一个级别时才有效,而不是嵌套行。这是模式:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

也尝试了Artem的建议,但没有编译:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}
8个回答

23
你可以使用getValuesMap将行对象转换为Map,然后将其转换为JSON:
import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")    
val row = df.first()          // this is an example row object

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}

9
更正:它实际上只适用于 Map / Struct 的第一层,对于嵌套的 Map,您只会看到值而不是键。 - Sami Badawi
1
@SamiBadawi,你能找到嵌套Map的解决方案吗? - Aravind Krishnakumar
1
我在嵌套方面也遇到了问题。 - Chris Olivier

5
我需要读取JSON输入并生成JSON输出。大多数字段都是单独处理的,但有几个JSON子对象只需要保留。
当Spark读取数据帧时,它会将记录转换为行(Row)。行(Row)是一种类似于JSON的结构。可以将其转换并写入JSON。
但我需要将一些子JSON结构提取出来作为一个新字段的字符串使用。
可以像这样完成:
dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address"))

location.address 是指传入的 JSON 数据帧中子 JSON 对象的路径。 address_json 是该对象的列名称,已转换为 JSON 的字符串版本。

to_json 已在 Spark 2.1 中实现。

如果使用 json4s 生成输出 JSON,则应将 address_json 解析为 AST 表示,否则输出的 JSON 将会对地址部分进行转义。


4

请注意,scala.util.parsing.json.JSONObject类已经被弃用,并且不支持null值。

@deprecated("此类将被移除。", "2.11.0")

"JSONFormat.defaultFormat无法处理null值"

https://issues.scala-lang.org/browse/SI-5092


谢谢Arnon。Scala的JSON支持现代化的讨论已经开始了。 - Sami Badawi

2

JSon有模式,但是Row没有模式,所以您需要在Row上应用模式并将其转换为JSon。以下是如何做到这一点。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

def convertRowToJson(row: Row): String = {

  val schema = StructType(
      StructField("name", StringType, true) ::
      StructField("meta", StringType, false) ::  Nil)

      return sqlContext.applySchema(row, schema).toJSON
}

1
基本上,您可以拥有一个仅包含一行的数据框。因此,您可以尝试筛选初始数据框,然后将其解析为json。

感谢您的建议。我尝试了您的方法:def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { val sparkContext = sqlContext.sparkContext import sparkContext._ import sqlContext.implicits._ import sqlContext._ val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) val dataFrame = rowRDD.toDF() //XXX does not compile dataFrame }但是它没有编译通过。 - Sami Badawi

1
如果您正在遍历数据框,则可以直接将数据框转换为包含JSON对象的新数据框,并进行迭代。
代码示例:

val df_json = df.toJSON


请修改您的问题,否则请使用评论。无论如何,请阅读社区规则。 - dpapadopoulos
是否可以设置列名?默认情况下为“value”,我想将其更改为“body”。 - Xavier John

1

我遇到了同样的问题,我的parquet文件具有规范模式(没有数组),我只想获取json事件。我按照以下方式操作,似乎可以正常工作(Spark 2.1):

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.util.parsing.json.JSONFormat.ValueFormatter
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject}

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = {
  schema.fields.map {
    field =>
      try{
        if (field.dataType.typeName.equals("struct")){
          field.name -> getValuesMap(row.getAs[Row](field.name),   field.dataType.asInstanceOf[StructType]) 
        }else{
          field.name -> row.getAs[T](field.name)
        }
      }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}}
  }.filter(xy => xy._2 != null).toMap
}

def convertRowToJSON(row: Row, schema: StructType): JSONObject = {
  val m: Map[String, Any] = getValuesMap(row, schema)
  JSONObject(m)
}
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
val defaultFormatter : ValueFormatter = (x : Any) => x match {
  case s : String => "\"" + JSONFormat.quoteString(s) + "\""
  case jo : JSONObject => jo.toString(defaultFormatter)
  case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter)
  case ja : JSONArray => ja.toString(defaultFormatter)
  case other => other.toString
}

val someFile = "s3a://bucket/file"
val df: DataFrame = sqlContext.read.load(someFile)
val schema: StructType = df.schema
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema))

0

我结合了Artem、KiranM和Psidom的建议,进行了大量的尝试和错误,并得出了这个解决方案,我已经测试过它适用于嵌套结构:

def row2Json(row: Row, sqlContext: SQLContext): String = {
  import sqlContext.implicits
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
  dataframe.toJSON.first
}

这个解决方案有效,但只能在驱动程序模式下运行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接