PySpark数据框是否有类似Pandas中的"pipe"函数?

5
例如在 Pandas 中,我会这样做:
data_df = (
     pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3']))
     .pipe(lambda df: df[df.col1 != 'a'])
 )   

这类似于R语言中的管道符号%>%

在PySpark中是否有类似的东西?


2
我不这么认为。至少,这不是一个好的方法。在PySpark中,你的DataFrame分布在多个服务器上。如果有一个像Panda的pipe一样的方法,它需要将所有服务器上的数据收集到一个单独的服务器上,然后调用lambda函数。你为什么想使用pipe?你想创建一个新列吗?添加新的列?对行/组/整个数据框进行转换或聚合? - MkWTF
@MkWTF 目前我只想重命名列。就这样。 - pettinato
你有多种方法可以做到这一点,查看此网站,非常适合学习Spark。我也在这里留下了pyspark文档,以防您需要它们。 - MkWTF
1
@MkWTF 我有几百列需要在循环中重命名,因此在pandas中使用pipe(standardize_col_names)是一个不错的选择。我的主要问题是关于pipe,@someshwar-kale已经回答了。管道在Spark中是相应的事情。 - pettinato
3个回答

4
您可以定义一个类似于“pandas”的pipe方法,并将其绑定到DataFrame类:
from pyspark.sql import DataFrame

def pipe(self, func, *args, **kwargs):
    return func(self, *args, **kwargs)

DataFrame.pipe = pipe 

然后,您可以将函数传递给pipe方法以应用于pyspark DataFrame。例如,假设您想从DataFrame my_df中选择除最后两列之外的所有列,并更改其列后。您可以使用pipe来实现此目的:

my_new_df = (
    my_df
    # Perform some operations to add and/or remove columns
    ... 
    # At this point the list of columns is different 
    # from `my_df.columns`
    .pipe(lambda df: df.select(*df.columns[:-2]))
)

4
在PySpark中,管道函数称为transform,其文档可以在此处找到:这里
其行为与Pandas的管道操作符相同。
因此,在PySpark中的示例如下:
data_df = (
  spark.createDataFrame(pd.DataFrame(dict(col1=['a', 'b', 'c'], col2=['1', '2', '3'])))
  .transform(lambda df: df.filter("col1 != 'a'"))
)

2
只是注意到 transform 已经在 Spark 3.0 中实现了。 - Sergey Zakharov
这里是一个目前有效的链接,指向DataFrame.transform文档。 - Daniel Himmelstein
pyspark.sql.DataFrame.transform 只接受并返回数据框(Dataframe),而通过使用@luiz-otavio-v-b-oliveira的管道函数扩展Dataframe也可以接受任意参数。 - Bart Joosten

2

我认为在 pyspark 中,您可以通过使用 pipeline 轻松实现此管道功能。

  1. 将每个管道函数转换为转换器。Spark 提供了一些预定义的转换器,我们也可以利用它们。
  2. 使用这些转换器创建流水线。
  3. 运行流水线以转换提供的数据帧。

例如:让我们采用您提供的示例

要转换的输入数据帧

 val df = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("col1", "col2")
    df.show(false)
    df.printSchema()
    /**
      * +----+----+
      * |col1|col2|
      * +----+----+
      * |a   |1   |
      * |b   |2   |
      * |c   |3   |
      * +----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = false)
      */

1. 将每个管道函数转换为转换器

对于.pipe(lambda df: df[df.col1 != 'a']),我们可以轻松使用spark的SQLTransformer。因此不需要创建自定义转换器

2. 使用转换器创建流水线

 val transform1 = new SQLTransformer()
      .setStatement("select * from __THIS__ where col1 != 'a'")
    val transform2 = new SQLTransformer()
      .setStatement("select col1, col2, SQRT(col2) as col3 from __THIS__")

    val pipeline = new Pipeline()
      .setStages(Array(transform1, transform2))

3. 运行管道以转换提供的数据框

pipeline.fit(df).transform(df)
      .show(false)

    /**
      * +----+----+------------------+
      * |col1|col2|col3              |
      * +----+----+------------------+
      * |b   |2   |1.4142135623730951|
      * |c   |3   |1.7320508075688772|
      * +----+----+------------------+
      */

1
@0111001101110000 你有检查过这个吗? - Som
这让我相信,管道是最好的并行处理 pandas 管道功能的方式。我认为我可以为我的转换创建自己的转换器,但我不认为这会增加代码的可读性,而这正是管道函数的主要优点。 - pettinato

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接