尝试应用lambda以创建新列时,“DataFrame”对象没有“apply”属性

4

我想在Pandas DataFrame中添加一列,但是遇到了一个奇怪的错误。

预期新列应该是从现有列转换而来的,可以通过在字典/哈希表中查找来完成。

# Loading data
df = sqlContext.read.format(...).load(train_df_path)

# Instanciating the map
some_map = {
    'a': 0, 
    'b': 1,
    'c': 1,
}

# Creating a new column using the map
df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

这导致以下错误:
AttributeErrorTraceback (most recent call last)
<ipython-input-12-aeee412b10bf> in <module>()
     25 df= train_df
     26 
---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)

/usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
    962         if name not in self.columns:
    963             raise AttributeError(
--> 964                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
    965         jc = self._jdf.apply(name)
    966         return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'apply'

其他可能有用的信息: * 我正在使用Spark和Python 2。


3
我认为那不是一个pandas DataFrame。错误是由一个Spark DataFrame引起的吗? - ayhan
你的脚本中是否定义了一个名为 apply 的变量? - harvpan
@user2285236 这可能是我感到困惑的原因。您知道如何在Spark中应用lambda吗? - Pierre-Antoine
@Harv Ipan,我没有一个名为apply的变量。我的目标是运行一个lambda函数来创建一个新列。 - Pierre-Antoine
2个回答

6
您所使用的语法是针对 pandas DataFrame 的。要在spark DataFrame 中实现此功能,您应该使用 withColumn() 方法。这对于广泛定义的 DataFrame 函数 非常有效,但对于用户定义的映射函数则有些复杂。

一般情况

为了定义一个 udf,您需要指定输出数据类型。例如,如果您想应用一个返回 string 的函数 my_func,您可以创建如下的 udf

import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())

然后,您可以使用my_udf创建一个新列,如下所示:

df = df.withColumn('new_column', my_udf(f.col("some_column_name")))

另一种选择是使用 select
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))

具体问题

使用udf

在您的特定情况下,您想使用字典来翻译DataFrame的值。

以下是定义用于此目的的udf的方法:

some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())

注意我使用了dict.get(),因为您希望您的udf能够应对不良输入而稳健。

df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))

使用DataFrame函数

有时候不可避免地需要使用udf,但尽可能使用DataFrame函数通常更受欢迎。

下面是一种不使用udf来完成同样任务的方法。

关键在于遍历some_map中的项目,创建一个pyspark.sql.functions.when()函数列表。

some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]

现在你可以在select语句中使用pyspark.sql.functions.coalesce()函数:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))

这个方法可行是因为when()默认情况下返回null,如果条件不满足,并且coalesce()会选择它遇到的第一个非null值。由于映射的键是唯一的,最多只有一列不为空。

1
您有一个Spark数据框,而不是Pandas数据框。要向Spark数据框添加新列:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接