在大型数据集上运行Pandas UDF时出现问题

7

我目前正在进行一个项目,但是我很难理解Pandas UDF在PySpark中是如何工作的。

我有一个Spark集群,其中包括一个8核64GB的主节点和两个16核112GB的工作节点。我的数据集相当大,分成七个主要分区,每个分区包含约7800万行。该数据集由70列组成。我定义了一个Pandas UDF来对数据集执行一些操作,这些操作只能使用Python在Pandas数据帧上完成。

Pandas UDF是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

在进行操作之前,Pandas UDF崩溃了,没有任何方法可以使其正常工作。我怀疑出现了OOM错误。以上的代码运行了几分钟后就会崩溃,并显示一个错误代码,指示连接已重置。 然而,如果我在筛选一个分区之后调用.toPandas()函数,然后显示它,它就可以正常运行,没有错误。似乎只有在使用PandasUDF时才会发生错误。

我不明白它是如何工作的。Spark是否尝试一次性转换一个整个分区(7800万行)?如果是这样,它使用哪个内存?驱动程序内存?执行器的内存?如果是在驱动程序上,所有Python代码是否都在其上执行?

集群配置如下:

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • spark.executor.cores 2
  • spark.executor.memory 30g(为Python实例分配内存)
  • spark.driver.memory 43g

我错过了什么还是没有办法通过PandasUDF运行7800万行?

2个回答

5
Spark是否尝试一次性转换一个完整的分区(7800万行)?
是的,这正是发生的事情。Spark 3.0添加了对分块UDF的支持,这些UDF操作于Pandas的DataFrame或Series迭代器,但如果在Pandas数据框上使用Python进行的数据集操作,则这些可能不是您的正确选择。
它使用哪些内存?Driver的内存?执行程序的内存?
每个分区在各自的执行程序上本地处理,并使用Arrow流将数据传递给Python工作程序并从中获取数据。
我有什么遗漏或者没有办法通过PandasUDF运行7800万行数据?
只要你有足够的内存来处理Arrow输入、输出(特别是如果数据被复制)、辅助数据结构以及JVM开销,它应该能够处理大型数据集。
但在这样小的集群上,最好是将输出进行分区,并直接使用Pandas读取数据,而不是使用Spark。这样,您将能够利用所有可用资源(即>100GB /解释器)进行数据处理,而不是将这些资源浪费在次要任务上(具有16GB - 开销 /解释器)。

非常感谢您的回答,我现在更好地理解了它的工作原理。您认为对于我所拥有的集群来说,什么样的Spark配置是可以接受的呢? - naifmeh
我尝试将spark.executor.memory设置为最大值,并将spark.python.worker.memory设置为该值的50%(超过35g),但仍然崩溃。此配置有问题吗? - naifmeh
“spark.python.worker.memory”在这里完全没有意义(它适用于RDD聚合,而这里不会发生这种情况)。至于配置-对于本地Spark作业,您可能可以找到一些甜点,但对于您描述的作业,我真的看不出有什么意义。 - user10938362
我认为需要注意的是,在Spark中使用Pandas UDF而不是直接使用Pandas,可以在应用程序中实现一致的API,并且无需来回转换。 - Brian

-1

关于在大型pyspark数据框上使用Pandas UDF的一般问题的答案:

如果您遇到诸如java.lang.OutOfMemoryError : GC overhead limit exceededjava.lang.OutOfMemoryError: Java heap space等内存不足错误,并且增加内存限制没有起作用,请确保启用了pyarrow。它默认是禁用的。

在pyspark中,您可以使用以下命令启用它:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

更多信息请参见此处


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接