如何将一个巨大的pandas数据框保存到HDFS?

13

我正在使用pandas和Spark数据框架。数据框架总是非常大(>20 GB),标准的Spark函数对于这些大小不足。目前,我将我的pandas数据帧转换为Spark数据帧,如下所示:

dataframe = spark.createDataFrame(pandas_dataframe)  

我进行这种转换是因为使用Spark将数据框写入HDFS非常容易:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

但是对于超过2GB的数据框,转换会失败。 如果我将Spark DataFrame转换为Pandas,则可以使用pyarrow:


// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

这是从spark到pandas的快速转换,也适用于大于2 GB的数据帧。我还没有找到另一种方法来实现相反的过程,即将pandas数据帧转换为spark,需要使用pyarrow。问题在于我真的找不到如何将pandas数据帧写入到hdfs。

我的pandas版本:0.19.0


你遇到了什么错误? 你确定应用程序是在写入时失败的,还是在之前(在数据框转换期间)出现了问题? - Edge7
它因为Java堆空间有限而导致内存不足异常,createDataFrame方法正在Java堆上构建一个字节数组。为了解决这个问题,我们需要使用pyarrow解决方案。如描述的那样,它已经完美地实现了Spark到Pandas的转换。但我还需要将Pandas转换为Spark,因为我找不到一种直接将Pandas保存到HDFS或Hive的方法。 - Mulgard
只是好奇,这么大的数据量,为什么不直接将数据写入数据库呢?例如Postgres,如果您仍然希望编写Python或C代码在数据库中进行操作。 - ely
一种可行的方法是从大数据框中创建N个Pandas数据框(每个小于2 GB)(水平分区),并创建N个不同的Spark数据框,然后将它们合并(Union)以创建一个最终的数据框,写入HDFS。我假设您的主机非常强大,但您也可以使用运行Spark的集群。 - Edge7
4个回答

23
意思是有一个pandas dataframe,现在需要用pyarrow将其转换为spark。pyarrow.Table.fromPandas 是你要找的函数。
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

结果可以直接写入Parquet/HDFS,而无需通过Spark传递数据:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

另请参阅

Spark笔记:

此外,自Spark 2.3(当前主要版本)以来,Arrow直接支持createDataFrameSPARK-20791 - 使用Apache Arrow改进Spark createDataFrame from Pandas.DataFrame)。它使用SparkContext.defaultParallelism来计算块的数量,因此您可以轻松控制单个批次的大小。

最后,defaultParallelism 可以用于控制使用标准的 _convert_from_pandas 生成的分区数量,从而有效地减小切片的大小,使其更易管理。

不幸的是,这些都不太可能解决您当前的内存问题。两者都依赖于 parallelize,因此会将所有数据存储在驱动节点的内存中。切换到 Arrow 或调整配置只能加速处理过程或解决块大小限制。

实际上,在这种情况下,只要使用本地 Pandas DataFrame 作为输入,我认为没有理由切换到 Spark。这种情况下最严重的瓶颈是驱动程序的网络 I/O,分发数据并不能解决这个问题。


1

1

另一种方法是将您的pandas数据帧转换为spark数据帧(使用pyspark),并使用save命令将其保存到hdfs中。

示例:
    df = pd.read_csv("data/as/foo.csv")
    df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
    sc = SparkContext(conf=conf)
    sqlCtx = SQLContext(sc)
    sdf = sqlCtx.createDataFrame(df)


这里的 astype 将你列的类型从 object 改变成 string,以免出现异常情况,因为 Spark 无法确定 Pandas 类型 object。但是请确保这些列确实是字符串类型。
现在要将您的 df 保存在 HDFS 中:
    sdf.write.csv('mycsv.csv')

-1
一个可行的方法是从大的数据框中创建N个小于2GB的Pandas数据框(水平分区),并创建N个不同的Spark数据框,然后将它们合并(Union)成一个最终的数据框,以写入HDFS。我假设您的主机非常强大,但您也有一个正在运行Spark的集群可用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接