在Scala中将数据框转换为JSON

3
假设我有一个单词计数的示例,其中我在一列中获取一个数据框作为单词,另一列中获取单词计数,我想将其收集并存储为JSON数组以在Mongo集合中使用。
eg for dataframe:
|Word  |  Count |
| abc   | 1   |
| xyz   |  23  |

我应该得到以下类似的JSON格式数据:
{words:[{word:"abc",count:1},{word:"xyz",count:23}]}

当我在数据框上尝试使用 .toJSON 方法并将值收集为列表,然后将其添加到数据框中时,结果存储在我的 MongoDB 中的是一组字符串而不是一组 JSON。
查询语句使用如下:
explodedWords1.toJSON.toDF("words").agg(collect_list("words")).toDF("words")

result : "{\"words\":[{\"word\":\"abc\",\"count\":1},{\"word\":\"xyz\",\"count\":23}]}"

我是Scala的新手。任何帮助都会很好。(如果不使用外部包会更好)。

1个回答

0

将数据帧中的数据存储到Mongo的绝佳方法是使用MongoDB Spark Connector(https://docs.mongodb.com/spark-connector/master/)

只需将"org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"添加到您的sbt依赖项并检查下面的代码即可

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession


val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/dbname")
  .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/dbname")
  .getOrCreate()

import spark.implicits._

val explodedWords1 = List(
  ("abc",1),
  ("xyz",23)
).toDF("Word", "Count")

MongoSpark.save(explodedWords1.write.option("collection", "wordcount").mode("overwrite"))

然而,如果您想要将结果作为单个json 文件返回,则下面的脚本应该可以实现:

explodedWords1.repartition(1).write.json("/tmp/wordcount")

最后,如果您想在Scala中将JSON作为字符串列表使用,只需使用以下代码:

explodedWords1.toJSON.collect()

更新:
我没有看到你想要将所有记录聚合到一个字段 ("words")。
如果您使用下面的代码,那么上述所有三种方法仍然有效(将explodedWords1换成aggregated)。
import org.apache.spark.sql.functions._

val aggregated = explodedWords1.agg(
  collect_list(map(lit("word"), 'Word, lit("count"), 'Count)).as("words")
)

选项1:explodedWords1

explodedWords1

选项2:聚合

aggregated


选项2确实以所需格式输出。但是,当将其作为列添加到数据帧以写入Mongo DB时,出现“Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Dataset [words: array<map<string,string>>]”错误。 - Vignesh Ram
你在agg方法中使用了lit()吗? 否则我无法在Spark 2.3上重现这个错误。 我的代码: https://gist.github.com/TomLous/92e7f3faa9a09c0b8a713159b49785b9 - Tom Lous
谢谢Tom,我成功解决了这个错误。Scala很有趣... :) :) 现在,由于结果“aggregated”是一个包含映射的数组,我们称之为“words”,假设我想像下面这样将值存储在mongo中:{userid:“Vignesh”,words:<我们的聚合值>}... 我该怎么做呢? - Vignesh Ram
在agg之前添加一个.groupBy('userid) - Tom Lous
如果您喜欢这个答案,能否请您投票并接受它呢? - Tom Lous

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接