将Spark dataframe 转换为 Pandas/R dataframe 的要求

6
我正在Hadoop的YARN上运行Spark。这个转换是如何工作的?在转换之前是否进行了collect()操作?
另外,我需要在每个从节点上安装Python和R才能使转换工作?我很难找到相关文档。
1个回答

14

toPandas(PySpark)/ as.data.frame(SparkR)

在创建本地数据框之前,必须先收集数据。例如,toPandas 方法如下:

def toPandas(self):
    import pandas as pd
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)

每个节点都需要安装Python以及其所有依赖项。

SparkR中(as.data.frame)的对应函数只是collect的别名。

简而言之,在两种情况下数据都会被collected到驱动程序节点,并分别转换为本地数据结构(pandas.DataFrame和Python中的base::data.frame,R中的data.frame)。

向量化的用户定义函数

Spark 2.3.0以来,PySpark还提供了一组pandas_udf (SCALAR, GROUPED_MAP, GROUPED_AGG),它们在由以下定义的数据块上并行操作:

  • 对于SCALAR变体,是分区
  • 对于GROUPED_MAPGROUPED_AGG,是分组表达式。

每个块由以下表示:

  • 对于SCALARGROUPED_AGG变体,是一个或多个pandas.core.series.Series
  • 对于GROUPED_MAP变体,是一个单独的pandas.core.frame.DataFrame

同样,自Spark 2.0.0以来,SparkR也提供了在由分区和分组表达式定义的data.frames上操作的dapplygapply函数。

上述函数:

  • 不会收集到驱动程序。除非数据仅包含单个分区(即使用coalesce(1))或分组表达��是微不足道的(即groupBy(lit(1))),否则没有单节点瓶颈。
  • 将相应的数据块加载到相应执行器的内存中。因此,它受限于各个执行器可用的内存大小/每个数据块的大小。

那么,toPandas 总是在驱动节点上吗?你永远不能在工作节点上的 map 函数中使用 pandas dataframe 吗? - Matthias
@Matthias toPandas 总是在驱动程序上运行。如果您想要,可以在 map 内部使用 pandas 对象,但这并不是特别关注 Spark 的事情。在执行器线程内获取的任何内容都只是普通的本地对象。 - zero323
啊,谢谢您的澄清。我今天刚和我的同事讨论了一个场景(请参见此链接)。 - Matthias
根据你的示例,它是以分布式方式工作的?这意味着您可以在map函数内创建许多pandas数据帧。它们是否位于驱动节点上? - Matthias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接