我目前正在进行一个项目,但是我很难理解Pandas UDF在PySpark中是如何工作的。
我有一个Spark集群,其中包括一个8核64GB的主节点和两个16核112GB的工作节点。我的数据集相当大,分成七个主要分区,每个分区包含约7800万行。该数据集由70列组成。我定义了一个Pandas UDF来对数据集执行一些操作,这些操作只能使用Python在Pandas数据帧上完成。
Pandas UDF是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
在进行操作之前,Pandas UDF崩溃了,没有任何方法可以使其正常工作。我怀疑出现了OOM错误。以上的代码运行了几分钟后就会崩溃,并显示一个错误代码,指示连接已重置。 然而,如果我在筛选一个分区之后调用.toPandas()函数,然后显示它,它就可以正常运行,没有错误。似乎只有在使用PandasUDF时才会发生错误。
我不明白它是如何工作的。Spark是否尝试一次性转换一个整个分区(7800万行)?如果是这样,它使用哪个内存?驱动程序内存?执行器的内存?如果是在驱动程序上,所有Python代码是否都在其上执行?
集群配置如下:
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=64g
- spark.executor.cores 2
- spark.executor.memory 30g(为Python实例分配内存)
- spark.driver.memory 43g
我错过了什么还是没有办法通过PandasUDF运行7800万行?