使用SparkR编写分区parquet文件

3

我有两个脚本,一个是用R编写的,另一个是用pyspark编写的短小脚本,后者使用了前者的输出。为了简化操作,我想将第二个脚本的功能复制到第一个脚本中。

第二个脚本非常简单--读取一堆csv文件并将它们作为分区parquet输出:

Original Answer: "最初的回答"

spark.read.csv(path_to_csv, header = True) \
     .repartition(partition_column).write \
     .partitionBy(partition_column).mode('overwrite') \
     .parquet(path_to_parquet)

在R中,这应该同样简单,但我无法弄清楚如何匹配SparkR中的partitionBy功能。到目前为止,我已经得到了以下内容:

最初的回答:

library(SparkR); library(magrittr)
read.df(path_to_csv, 'csv', header = TRUE) %>%
  repartition(col = .$partition_column) %>%
  write.df(path_to_parquet, 'parquet', mode = 'overwrite')

这段代码成功地为partition_column的每个值写了一个parquet文件。问题在于生成的文件目录结构有误;与Python生成的目录结构不同。
/path/to/parquet/
  partition_column=key1/
    file.parquet.gz
  partition_column=key2/
    file.parquet.gz
  ...

"最初的回答":R只产生
/path/to/parquet/
  file_for_key1.parquet.gz
  file_for_key2.parquet.gz
  ...

我有点不明白,SparkR中的partitionBy函数似乎只涉及窗口函数的上下文,并且我在手册中没有看到其他可能相关的内容。也许可以通过...传递参数,但是我在文档或在线搜索中都没有看到任何示例。

最初的回答:

1个回答

1

Spark <= 2.x 不支持输出分区。

然而,SparR >= 3.0.0 (SPARK-21291 - R partitionBy API) 将提供支持,使用以下语法:

write.df(
  df, path_to_csv, "parquet", mode = "overwrite",
  partitionBy = "partition_column"
)

由于相应的PR仅修改R文件,因此如果无法升级到开发版本,则应该能够修补任何SparkR 2.x分发版:

git clone https://github.com/apache/spark.git
git checkout v2.4.3  # Or whatever branch you use
# https://github.com/apache/spark/commit/cb77a6689137916e64bc5692b0c942e86ca1a0ea
git cherry-pick cb77a6689137916e64bc5692b0c942e86ca1a0ea
R -e "devtools::install('R/pkg')"

在客户端模式下,这只需要在驱动节点上进行即可。 但这些并不致命,也不应该引起任何严重问题。

看起来 partition by 的参数是在7个月前解决的?我不在我的机器上所以不能查看发布情况...最新的版本中没有 partitionBy 参数吗?(顺便说一下,我正在使用2.3.1) - undefined
不是这样的。修复版本是3.0,而最新发布的版本是2.4.3。 - undefined
谢谢。初步看PR,似乎已经可行...稍后会进行测试。 - undefined
是的,这个很容易做到。我已经编辑了答案并添加了说明。 - undefined
1
哦,实际上我指的是更简单的事情。就是直接复制粘贴PR中的方法定义,然后提取callJMethod = SparkR:::callJMethodsetWriteOptionshandledCallJMethod,这样即使不需要更新软件包也能正常工作。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接