基于列表的pyspark dataframe筛选或包含。

115

我正在尝试使用列表在pyspark中过滤数据框。我希望可以根据列表进行筛选,或者仅包括具有列表中值的记录。我的下面的代码无效:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
给出以下错误: ValueError: 无法将列转换为布尔值:在构建DataFrame布尔表达式时,请使用“&”表示“and”,“|”表示“or”,“~”表示“not”。
3个回答

167

它说的是 "df.score in l" 无法评估,因为 df.score 给你一个列,而 "in" 在该列类型上未定义,应使用 "isin "。

代码应该像这样:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(~df.score.isin(l))
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
df.filter(df.score.isin(l))
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
注意 where() filter()的别名,因此两者可以互换使用。


4
如果使用广播变量作为列表来处理,你应该如何操作?当我尝试这样做时,会提示错误“Broadcast”对象没有' _get_object_id'属性。 - flyingmeatball
@flyingmeatball 我认为你可以使用broadcast_variable_name.value来访问列表。 - Amelia N Chu
1
如果您想使用广播,则可以按照以下方式操作:l_bc = sc.broadcast(l),然后是 df.where(df.score.isin(l_bc.value)) - Alex_Gidiotis
如果您正在尝试根据列值列表筛选数据框,这可能会有所帮助:https://stackoverflow.com/a/66228314/530399 - Bikash Gyawali

62
基于 @user3133475 的答案,也可以像这样从 col() 函数调用 isin() 函数:isin()
from pyspark.sql.functions import col


l = [10,18,20]
df.filter(col("score").isin(l))

13
我发现对于大型数据框,join 实现比 where 要快得多:
def filter_spark_dataframe_by_list(df, column_name, filter_list):
    """ Returns subset of df where df[column_name] is in filter_list """
    spark = SparkSession.builder.getOrCreate()
    filter_df = spark.createDataFrame(filter_list, df.schema[column_name].dataType)
    return df.join(filter_df, df[column_name] == filter_df["value"])

为什么会这样呢?你必须处理如此庞大的数据吗? - Nebi M Aydin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接