pyspark中与`df.loc`等效的方法是什么?

12

我正在寻找Pyspark数据框架的等效替代品。具体来说,我想在Pyspark数据框架上执行以下操作。

# in pandas dataframe, I can do the following operation
# assuming df = pandas dataframe
index = df['column_A'] > 0.0
amount = sum(df.loc[index, 'column_B'] * df.loc[index, 'column_C']) 
        / sum(df.loc[index, 'column_C'])

我想知道在pyspark DataFrame中,做这个操作的等效方法是什么?

3个回答

3

Spark的DataFrame没有严格的顺序,因此索引没有意义。相反,我们使用类似SQL的DSL语言。在这里,您将使用wherefilter)和select。如果数据看起来像这样:

import pandas as pd
import numpy as np
from pyspark.sql.functions import col, sum as sum_

np.random.seed(1)

df = pd.DataFrame({
   c: np.random.randn(1000) for c in ["column_A", "column_B", "column_C"]
})

amount 代表金额。

amount
# 0.9334143225687774

对应的Spark版本是:

sdf = spark.createDataFrame(df)

(amount_, ) = (sdf
    .where(sdf.column_A > 0.0)
    .select(sum_(sdf.column_B * sdf.column_C) / sum_(sdf.column_C))
    .first())

结果是数字相等的:

abs(amount - amount_)
# 1.1102230246251565e-16

您可以使用条件语句:
from pyspark.sql.functions import when

pred = col("column_A") > 0.0

amount_expr = sum_(
  when(pred, col("column_B")) * when(pred, col("column_C"))
) / sum_(when(pred, col("column_C")))

sdf.select(amount_expr).first()[0]
# 0.9334143225687773

这些看起来更像Pandas,但是更冗长。


2

使用 RDD 这个对象可以很容易地实现(我对 spark.sql.DataFrame 不太熟悉):

x, y = (df.rdd
        .filter(lambda x: x.column_A > 0.0)
        .map(lambda x: (x.column_B*x.column_C, x.column_C))
        .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])))
amount = x / y

或者过滤DataFrame然后跳转到RDD

x, y = (df
        .filter(df.column_A > 0.0)
        .rdd
        .map(lambda x: (x.column_B*x.column_C, x.column_C))
        .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])))
amount = x / y

经过一番搜索,我不确定这是否是最高效的方法,但不需要涉及到RDD

x, y = (df
        .filter(df.column_A > 0.0)
        .select((df.column_B * df.column_C).alias("product"), df.column_C)
        .agg({'product': 'sum', 'column_C':'sum'})).first()
amount = x / y

0

更快的Pysparky答案

import pyspark.sql.functions as f
sdf=sdf.withColumn('sump',f.when(f.col('colA')>0,f.col('colB')*f.col('colC')).otherwise(0))
z=sdf.select(f.sum(f.col('sump'))/f.sum(f.col('colA'))).collect()
print(z[0])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接