如何将numpy.array作为新列添加到pyspark.SQL DataFrame中?

6

这里是用于创建pyspark.sql DataFrame的代码:

import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)

这样sparkdf看起来像是

a  b  c
1  2  3
4  5  6
7  8  9
10 11 12

现在,我想添加一个 numpy 数组(或甚至是列表)作为新列。
new_col = np.array([20,20,20,20])

但是标准的做法是

sparkdf = sparkdf.withColumn('newcol', new_col)

失败了。 可能使用udf是解决问题的方法,但我不知道如何创建一个udf,使其为每个DataFrame行分配一个不同的值,即遍历new_col。 我已经查看了其他pyspark和pyspark.sql,但找不到解决方案。 另外,我需要在pyspark.sql内部保持,不能使用scala解决方案。谢谢!

1个回答

6
假设数据框已按数组值的顺序进行了排序,您可以对 RDD 进行压缩,并重新构建数据框,如下所示:
n = sparkdf.rdd.getNumPartitions()

# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int) 

def process(pair):
    return dict(pair[0].asDict().items() + [("new_col", pair[1])])

rdd = (sparkdf
    .rdd # Extract RDD
    .zip(new_col) # Zip with new col
    .map(process)) # Add new column

sqlContext.createDataFrame(rdd) # Rebuild data frame

您也可以使用连接操作符:
new_col = sqlContext.createDataFrame(
    zip(range(1, 5), [20] * 4),
    ("rn", "new_col"))

sparkdf.registerTempTable("df")

sparkdf_indexed = sqlContext.sql(
    # Make sure we have specific order and add row number
    "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")

(sparkdf_indexed
    .join(new_col, new_col.rn == sparkdf_indexed.rn)
    .drop(new_col.rn))

但是窗口函数组件不具有可扩展性,并且在处理较大数据集时应避免使用。

当然,如果您只需要单个值的列,可以直接使用lit

import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))

但我假设这不是情况。


谢谢。看起来(第一个解决方案)你必须返回RDD,然后在最后再次转换为DataFrame,没有办法保持在pyspark.sql内部。这两个解决方案中哪一个提供更好的性能,即更快? - rstreppa
实际上,我收到了以下错误信息: ValueError: Can only zip with RDD which has the same number of partitions - rstreppa
对的。该列并非来自另一张表,而是现有表的操作结果。在numpy中计算速度很快,我需要将结果作为额外列引入。 - rstreppa
关于错误,请仅指定输入的分区数量。对于您的问题,它是行级操作还是使用多个行? - zero323
我对五列数据执行逐行操作,然后对每行的结果进行逐行乘法运算。看起来numpy很快,否则我认为我需要临时新建列来存储结果,然后再进行最终的乘法运算。 - rstreppa
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接