用Python编写嵌套的Parquet格式文件

6

我有一个扁平的镶木地板文件,在其中一个varchar列中将JSON数据存储为字符串,我想将此数据转换为嵌套结构,即JSON数据变为嵌套的parquet。如果有帮助,我事先知道JSON的模式。

这是我到目前为止“完成”的内容:


构建示例数据

# load packages

import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq

# Create dummy data

# dummy data with JSON as string
person_data = {'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }

# from dict to panda df
person_df = pd.DataFrame.from_dict(person_data)

# from panda df to pyarrow table
person_pat = pa.Table.from_pandas(person_df)

# save as parquet file
pq.write_table(person_pat, 'output/example.parquet')

脚本提案

# load dummy data
sample = pa.parquet.read_table('output/example.parquet')

# transform to dict
sample_dict = sample.to_pydict()
# print with indent for checking
print(json.dumps(sample_dict, sort_keys=True, indent=4))
# load json from string and replace string
sample_dict['languages'] = json.loads(str(sample_dict['languages']))
print(json.dumps(sample_dict, sort_keys=True, indent=4))
#type(sample_dict['languages'])

# how to keep the nested structure when going from dict —> panda df —> pyarrow table?
# save dict as nested parquet...

以下是需要翻译的内容:

  1. 这种方法可行吗?有没有更好的优化方式?在字典、数据框和pa表格之间进行的所有转换都不太高效,所以我很乐意在这里学习。
  2. 如何在进行字典 -> 数据框转换时保留嵌套结构?还是根本不需要保留嵌套结构?
  3. 写入嵌套parquet文件的最佳方法是什么?我已经阅读了Nested data in Parquet with Python,并在其中提到了快速parquet用于读取,但缺乏写入能力-现在有没有可行的解决方案?

1
你能用PySpark做这个吗?我认为使用它会更容易。如果你想,我可以用PySpark编写一个解决方案,然后你可以决定是否使用它是个好主意。 - Oscar Lopez M.
在您的情况下,似乎不支持编写嵌套数据,请检查 https://issues.apache.org/jira/browse/ARROW-1644?我建议使用Pyspark。 - Swetha Shanmugam
嗨@OscarLopezM.,抱歉我离开了一段时间。非常感谢使用PySpark的解决方案。已经非常感激! - Stephan Claus
1个回答

4

如下所示,PySpark可以简单地完成这项任务。使用PySpark的主要优势是基础架构在数据增长时的可扩展性,但如果你不使用像Dask这样的框架,使用纯Python可能有问题,因为你需要更大的机器来运行它。

from pyspark.sql import HiveContext
hc = HiveContext(sc)

# This is a way to create a PySpark dataframe from your sample, but there are others 
nested_df = hc.read.json(sc.parallelize(["""
{'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }
"""]))

# You have nested Spark dataframe here. This shows the content of the spark dataframe. 20 is the max number of rows to show on the console and False means don't cut the columns that don't fit on the screen (show all columns content)
nested_df.show(20,False)

# Writes to a location as parquet
nested_df.write.parquet('/path/parquet')

# Reads the file from the previous location
spark.read.parquet('/path/parquet').show(20, False)

这段代码的输出结果是:
+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

回答你的问题:

  1. 我认为这更加高效,因为如果您可以在Spark中使用更多的执行器,那么无论您拥有多少数据都没有关系。
  2. 您可以看到,在加载parquet文件时,所有的字典和列表都得到了保留。
  3. 这取决于“最佳”的定义,但我认为这是一个不错的选择;)

嗨,Oscar,感谢你的回答。在Jupyter中运行时,我遇到了一个错误:NameError: name 'sc' is not defined。有什么建议如何解决吗? - Stephan Claus
嗨,Stephan,没问题。sc是指sparkContext对象。这取决于您使用的Spark版本,您可能必须像这样使用它或以不同的方式使用它。实际上,那部分只是为了模拟输入,因为我假设您有一个json文件而不是像那样的字符串。您能否尝试hc = HiveContext(spark.sparkContext)并查看是否有效?请注意,我正在用spark.sparkContext替换sc - Oscar Lopez M.
感谢您的迅速回复。我按照您的指示进行了替换,但现在出现了NameError: name 'spark' is not defined的错误。我已经在http://localhost:4040/environment/上运行了Spark,但不确定是否需要导入其他内容。 - Stephan Claus
另一种读取单行的方法可以在https://dev59.com/dqnka4cB1Zd3GeqPWvlL中看到。如果您对此有更多问题,请告诉我。谢谢 - Oscar Lopez M.
嗨,Oscar,感谢您的支持,我通过将以下行添加到脚本中使其工作:sc = SparkContext.getOrCreate() spark = SparkSession(sc) hc = HiveContext(sc) - Stephan Claus
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接