Spark Scala CSV输入转换为嵌套JSON

3
这是我的输入数据的样子,
20170101,2024270,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2024333,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170101,2023709,1000,1000,1000,1000,1000,1000,1000,2000,2000
20170201,1234709,1000,1000,1000,1000,1000,1000,1000,2000,2000

我希望将其转换为keyValue RDD,其中key是整数,value是JSON对象,目的是将其写入ElasticSearch。

( 
2024270, {
"metrics": {
  "date" : 20170201,
  "style_id" : 1234709,
  "revenue" : 1000,
  "list_count" : 1000,
  "pdp_count" : 1000,
  "add_to_cart_count" : 1000
 }
}
)

在Python中,我可以使用以下代码来执行相同的操作:
metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)


def format_metrics(line):
    tokens = line.split('^')
    try:
        return (tokens[1], {
                    'metrics': {
                        'date': tokens[0],
                        'mrp': float(tokens[2]),
                        'revenue': float(tokens[3]),
                        'quantity': int(tokens[4]),
                        'product_discount': float(tokens[5]),
                        'coupon_discount': float(tokens[6]),
                        'total_discount': float(tokens[7]),
                        'list_count': int(tokens[8]),
                        'add_to_cart_count': int(tokens[9]),
                        'pdp_count': int(tokens[10])
                    }
                }) if len(tokens) > 1 else ('', dict())

但是我不知道如何在Scala中实现相同的功能,而且我对Scala是新手,我成功地获得了下面的输出,但无法将JSON包装成"metrics"块,任何指针都将非常有帮助?

ordersDF.withColumn("key", $"style_id")
        .withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "json")
        .show(false)

// Exiting paste mode, now interpreting.

+-------+-------------------------------------------------+
|key    |json                                             |
+-------+-------------------------------------------------+
|2024270|{"date":20170101,"style_id":2024270,"mrp":1000.0}|
|2024333|{"date":20170101,"style_id":2024333,"mrp":1000.0}|
|2023709|{"date":20170101,"style_id":2023709,"mrp":1000.0}|
|1234709|{"date":20170201,"style_id":1234709,"mrp":1000.0}|
+-------+-------------------------------------------------+

想要将其转换为键/值RDD,因为我想要与另一个RDD(维度信息)进行连接,该RDD中也包含以JSON格式存储的数据,然后将输出写入ES。 - Rajiv
1
json列重命名为metrics,并在结果Dataframe上调用toJSON - philantrovert
谢谢 @philantrovert,它起作用了 - Rajiv
@RajivChodisetti,你应该让philantrovert来回答这个问题。 - Ramesh Maharjan
没关系。toJSON无论如何都不能提供完全需要的输出。 - philantrovert
1个回答

2

我尝试了@philantrovert建议的方法,它起作用了。

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv")
ordersDF: org.apache.spark.sql.DataFrame = [date: int, style_id: int ... 9 more fields]

scala> :paste
// Entering paste mode (ctrl-D to finish)

ordersDF.withColumn("key", $"style_id")
        .withColumn("metrics", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "metrics")
        .toJSON
        .show(false)

// Exiting paste mode, now interpreting.

+-----------------------------------------------------------------------------------+
|value                                                                              |
+-----------------------------------------------------------------------------------+
|{"key":2024270,"metrics":"{\"date\":20170101,\"style_id\":2024270,\"mrp\":1000.0}"}|
|{"key":2024333,"metrics":"{\"date\":20170101,\"style_id\":2024333,\"mrp\":1000.0}"}|
|{"key":2023709,"metrics":"{\"date\":20170101,\"style_id\":2023709,\"mrp\":1000.0}"}|
|{"key":1234709,"metrics":"{\"date\":20170201,\"style_id\":1234709,\"mrp\":1000.0}"}|
+-----------------------------------------------------------------------------------+

我还尝试了另一种方法,使用Json4s库,也成功地完成了翻译。

def convertRowToJSON(row: Row) = {

    val json =
    ("metrics" ->
      ("date" -> row(1).toString) ~
      ("style_id" -> row.getInt(1)) ~
      ("mrp" -> row.getFloat(2)) ~
      ("revenue" -> row.getFloat(3)) ~
      ("quantity" -> row.getInt(1)) ~
      ("product_discount" -> row.getFloat(3)) ~
      ("coupon_discount" -> row.getFloat(3)) ~
      ("total_discount" -> row.getFloat(3)) ~
      ("list_count" -> row.getInt(1)) ~
      ("add_to_cart_count" -> row.getInt(1)) ~
      ("pdp_count" -> row.getInt(1))
      )
    (row.getInt(1),compact(render(json)).toString)
}

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv").map(convertRowToJSON)
ordersDF: org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

scala> ordersDF.show(false)
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1     |_2                                                                                                                                                                                                                                                |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270|{"metrics":{"date":"2024270","style_id":2024270,"mrp":1000.0,"revenue":1000.0,"quantity":2024270,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024270,"add_to_cart_count":2024270,"pdp_count":2024270}}|
|2024333|{"metrics":{"date":"2024333","style_id":2024333,"mrp":1000.0,"revenue":1000.0,"quantity":2024333,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2024333,"add_to_cart_count":2024333,"pdp_count":2024333}}|
|2023709|{"metrics":{"date":"2023709","style_id":2023709,"mrp":1000.0,"revenue":1000.0,"quantity":2023709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":2023709,"add_to_cart_count":2023709,"pdp_count":2023709}}|
|1234709|{"metrics":{"date":"1234709","style_id":1234709,"mrp":1000.0,"revenue":1000.0,"quantity":1234709,"product_discount":1000.0,"coupon_discount":1000.0,"total_discount":1000.0,"list_count":1234709,"add_to_cart_count":1234709,"pdp_count":1234709}}|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

json4s也是非常不错的。很好的答案。你可以接受自己的答案并关闭这个问题。 - philantrovert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接