Pyspark - 将json字符串转换为DataFrame

31

我有一个test2.json文件,其中包含简单的JSON:

{  "Name": "something",  "Url": "https://stackoverflow.com",  "Author": "jangcy",  "BlogEntries": 100,  "Caller": "jangcy"}

我已经将文件上传到 Blob 存储中, 并从中创建了一个 DataFrame:

df = spark.read.json("/example/data/test2.json")

那么我就能够毫无问题地看到它:

df.show()
+------+-----------+------+---------+--------------------+
|Author|BlogEntries|Caller|     Name|                 Url|
+------+-----------+------+---------+--------------------+
|jangcy|        100|jangcy|something|https://stackover...|
+------+-----------+------+---------+--------------------+

第二种情况: 我的笔记本中确实声明了相同的json字符串:

newJson = '{  "Name": "something",  "Url": "https://stackoverflow.com",  "Author": "jangcy",  "BlogEntries": 100,  "Caller": "jangcy"}'

我可以打印它等等。但是现在如果我想从中创建一个DataFrame:

df = spark.read.json(newJson)

我遇到了“相对路径在绝对URI中”的错误:

'java.net.URISyntaxException: Relative path in absolute URI: {  "Name":%20%22something%22,%20%20%22Url%22:%20%22https:/stackoverflow.com%22,%20%20%22Author%22:%20%22jangcy%22,%20%20%22BlogEntries%22:%20100,%20%20%22Caller%22:%20%22jangcy%22%7D'
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 249, in json
    return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'java.net.URISyntaxException: Relative path in absolute URI: {  "Name":%20%22something%22,%20%20%22Url%22:%20%22https:/stackoverflow.com%22,%20%20%22Author%22:%20%22jangcy%22,%20%20%22BlogEntries%22:%20100,%20%20%22Caller%22:%20%22jangcy%22%7D'

我应该对newJson字符串进行其他转换吗?如果是,应该是什么?如果这太琐碎,请原谅,因为我对Python和Spark非常陌生。

我正在使用带有PySpark3内核的Jupyter笔记本。

提前致谢。


这显然是“摄取管道”帮助部分的一部分,因此在索引时间而不是查询时间重命名字段。 - Thierry Barnier
1个回答

66

您可以执行以下操作

newJson = '{"Name":"something","Url":"https://stackoverflow.com","Author":"jangcy","BlogEntries":100,"Caller":"jangcy"}'
df = spark.read.json(sc.parallelize([newJson]))
df.show(truncate=False)

这应该会提供

+------+-----------+------+---------+-------------------------+
|Author|BlogEntries|Caller|Name     |Url                      |
+------+-----------+------+---------+-------------------------+
|jangcy|100        |jangcy|something|https://stackoverflow.com|
+------+-----------+------+---------+-------------------------+

2
非常感谢你,Ramesh。运作得很好! :) - Jangcy
我有一个类似于列的dataframe中的相同json。我无法解析它。我该如何实现这一点。请帮忙。 - Naveen Srikanth
1
谢谢Ramesh。我正在尝试使用相同的逻辑。阅读400-500百万个JSON文件会对性能产生任何影响吗? - Naveen Srikanth
这取决于群集配置和您的代码。@NaveenSrikanth - Ramesh Maharjan
1
我使用了 spark.read.json(sc.wholeTextFiles("s3a://jsonparser_coding/").values()) ,Ramesh。我成功读取了 :) :) - Naveen Srikanth
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接