我正在Hadoop的YARN上运行Spark。这个转换是如何工作的?在转换之前是否进行了collect()操作?
另外,我需要在每个从节点上安装Python和R才能使转换工作?我很难找到相关文档。
另外,我需要在每个从节点上安装Python和R才能使转换工作?我很难找到相关文档。
toPandas
(PySpark)/ as.data.frame
(SparkR)
在创建本地数据框之前,必须先收集数据。例如,toPandas
方法如下:
def toPandas(self):
import pandas as pd
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
每个节点都需要安装Python以及其所有依赖项。
SparkR中(as.data.frame
)的对应函数只是collect
的别名。
简而言之,在两种情况下数据都会被collected
到驱动程序节点,并分别转换为本地数据结构(pandas.DataFrame
和Python中的base::data.frame
,R中的data.frame
)。
向量化的用户定义函数
自Spark 2.3.0以来,PySpark还提供了一组pandas_udf
(SCALAR
, GROUPED_MAP
, GROUPED_AGG
),它们在由以下定义的数据块上并行操作:
SCALAR
变体,是分区GROUPED_MAP
和GROUPED_AGG
,是分组表达式。每个块由以下表示:
SCALAR
和GROUPED_AGG
变体,是一个或多个pandas.core.series.Series
。GROUPED_MAP
变体,是一个单独的pandas.core.frame.DataFrame
。同样,自Spark 2.0.0以来,SparkR也提供了在由分区和分组表达式定义的data.frames
上操作的dapply
和gapply
函数。
上述函数:
coalesce(1)
)或分组表达��是微不足道的(即groupBy(lit(1))
),否则没有单节点瓶颈。