我有一个Python类,用于在Spark中加载和处理一些数据。在需要完成的各种任务中,我正在生成从Spark dataframe中的各列派生的虚拟变量列表。我的问题是,我不确定如何正确定义用户定义的函数来实现我所需的功能。
我目前有一个方法,可以在底层dataframe RDD上映射时解决一半的问题(请记住,这是大型data_processor
类中的一个方法):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
本质上,这个函数针对指定的数据框架,获取指定列的分类变量值,并返回这些新虚拟变量的值列表。这意味着以下代码:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))
返回类似于以下内容:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
这正是我想要的生成虚拟变量列表的方法,但是我的问题在于:我该如何(a)创建一个具有类似功能的UDF,以便在Spark SQL查询中使用它(或者其他方式),还是(b)将上述map操作的RDD作为新列添加到user_data数据帧中呢?无论哪种方式,我需要生成一个新数据帧,其中包含来自user_data的列,以及一个新列(称为
feature_array
),其中包含上述函数的输出(或者与之等效的内容)。
model.matrix
)。很可能是为了训练某种线性模型。Rish解释-字符串索引器类似于从字符串创建因子列,而one hot则调用model.matrix
:) @DavidArenburg - zero323from pyspark.mllib.linalg import DenseVector
应该被替换为from pyspark.ml.linalg import DenseVector
,否则在VectorIndexer
阶段可能会出现类型错误。 - EnriqueH