Spark:DataFrame 中 zipWithIndex 的等效方法

6
假设我有以下数据框:
dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])

我希望您创建以下数据框:

代码如下:

[('a',0),('b',2),('c',1),('d',3),('e',0)]

我的做法是将它转换为rdd,并使用zipWithIndex函数,然后联接结果:

convertDF = (df.select('number')
              .distinct()
              .rdd
              .zipWithIndex()
              .map(lambda x:(x[0].number,x[1]))
              .toDF(['old','new']))


finalDF = (df
            .join(convertDF,df.number == convertDF.old)
            .select(df.letter,convertDF.new))

在dataframes中是否有类似于zipWithIndex的功能?还有其他更有效的方法来完成此任务吗?


2
https://dev59.com/wlwY5IYBdhLWcg3wU2XA - zero323
1个回答

9
请查看https://issues.apache.org/jira/browse/SPARK-23074,寻找数据框直接功能的对应实现。如果您希望在Spark中某个时间点看到它,请为该Jira投票。

然而,在PySpark中有一个解决方法:

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

这也可以在abalon软件包中获得。


对于Python 3+,需要进行一些小的更改才能使其正常工作,因为map处理元组的方式不同。以下代码将作为单个参数传递元组,并使用[]符号读取元组的元素,new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0]))) - Paladin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接