使用Spark DataFrames对多个字符串分类特征进行独热编码

6

我的目标是使用Spark DataFrames对一组分类列进行One-Hot编码。例如,就像Pandas中的get_dummies()函数一样。

数据集bureau.csv最初来自于Kaggle的一个比赛Home Credit Default Risk。这里是我的入口表示例entryData,仅筛选出KEY = 100001的数据。

# primary key
KEY = 'SK_ID_CURR'
data = spark.read.csv("bureau.csv", header=True, inferSchema=True)
# sample data from bureau.csv of 1716428 rows
entryData = data.select(columnList).where(F.col(KEY) == 100001).show()
print(entryData)

+----------+-------------+---------------+---------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY|    CREDIT_TYPE|
+----------+-------------+---------------+---------------+
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Closed|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
|    100001|       Active|     currency 1|Consumer credit|
+----------+-------------+---------------+---------------+

我想通过创建函数 catg_encode(entryData, columnList) 来对列表 columnList 进行独热编码。

columnList = cols_type(entryData, obj=True)[1:]
print(columnList)
['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE']

注意 cols_type() 是一个返回列列表的函数,如果obj=True则返回分类列,否则返回数值列。

我已经成功地对第一列'CREDIT_ACTIVE'进行了独热编码,但我无法同时对所有列进行编码,也就是说无法构建catg_encode函数。

# import necessary modules
from pyspark.sql import functions as F

# look for all distinct categoris within a given feature (here 'CREDIT_ACTIVE')
categories = entryData.select(columnList[0]).distinct().rdd.flatMap(lambda x: x).collect()
# one-hot encode the categories
exprs = [F.when(F.col(columnList[0]) == category, 1).otherwise(0).alias(category) for category in categories]
# nice table with encoded feature 'CREDIT_ACTIVE'
oneHotEncode = entryData.select(KEY, *exprs)
print(oneHotEncode)

+----------+--------+----+------+------+
|SK_ID_CURR|Bad debt|Sold|Active|Closed|
+----------+--------+----+------+------+
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     0|     1|
|    100001|       0|   0|     1|     0|
|    100001|       0|   0|     1|     0|
|    100001|       0|   0|     1|     0|
+----------+--------+----+------+------+

这里的特征'CREDIT_ACTIVE'有4个不同的类别; ['坏账', '已售出', '活跃', '关闭'].

注意: 我甚至尝试了IndexToStringOneHotEncoderEstimator,但对于这个特定的任务没有帮助。

我期望得到以下输出:

+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|SK_ID_CURR|Bad debt|Sold|Active|Closed|currency 1|currency 2|currency 3|currency 4|..........|...
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|   
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     0|     1|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
|    100001|       0|   0|     1|     0|         1|         0|         0|         0|        ..|
+----------+--------+----+------+------+----------+----------+----------+----------+----------+--- 

连续的点...代表了特征'CREDIT_TYPE'的其余类别,包括:['购买设备贷款', '现金贷款(非指定用途)', '小额贷款', '消费信贷', '移动运营商贷款', '其他类型贷款', '抵押贷款', '银行间信贷', '流动资金贷款', '汽车贷款', '房地产贷款', '未知类型贷款', '业务发展贷款', '信用卡', '股票购买贷款(保证金借贷)']注意:我看到了这篇文章E-num / get Dummies in pyspark ,但它不能自动化处理多个列的情况,而我的问题正是如此。该文章提供了一种解决方案,即为每个分类特征编写单独的代码,但这不是我的问题。

如果这些不同类别的列的顺序不重要,您可以创建一个包含所有目标列的数组列,例如 df.withColumn('arr', F.array(columnList)),然后使用 CountVectorizer 一次性创建独热编码。以下是我以前的一篇帖子作为示例:https://stackoverflow.com/questions/58010126 - jxc
2个回答

3

有两种方法可以榨这个柠檬,让我们来看看它们。

  1. 旋转和连接
import pyspark.sql.functions as f

df1 = spark._sc.parallelize([
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Closed', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100001, 'Active', 'currency 1', 'Consumer credit'],
    [100002, 'Active', 'currency 2', 'Consumer credit'],
]).toDF(['SK_ID_CURR', 'CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE'])

# this can be done dynamically, but I don't have all categories
categories = ['Active', 'Closed', 'Bad debt', 'Sold']

# we need to pivot without aggregation, so I need to add an `id` column and group by it as well
credit_groups = (
  df1.withColumn('id', f.monotonically_increasing_id())
     .groupBy('SK_ID_CURR', 'id')
     .pivot('CREDIT_ACTIVE', values=categories)
     .agg(f.lit(1))
     .drop('id')
)

# currency groups are just a 1 for each currency and ID, as per the example data
# if this is not the case, something more clever needs to be here 
currency_groups = df1.groupBy('SK_ID_CURR').pivot('CREDIT_CURRENCY').agg(f.lit(1))

# join the two pivoted tables on the ID and fill nulls to zeroes
credit_groups.join(currency_groups, on=['SK_ID_CURR'], how='inner').na.fill(0).show()

+----------+------+------+--------+----+----------+----------+
|SK_ID_CURR|Active|Closed|Bad debt|Sold|currency 1|currency 2|
+----------+------+------+--------+----+----------+----------+
|    100002|     1|     0|       0|   0|         0|         1|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
|    100001|     1|     0|       0|   0|         1|         0|
|    100001|     0|     1|       0|   0|         1|         0|
+----------+------+------+--------+----+----------+----------+
  1. 使用StringIndexerOneHotEncoderEstimator来完成如下操作:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df1) for column in ['CREDIT_ACTIVE', 'CREDIT_CURRENCY']]

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df1).transform(df1)
df_indexed.show()

+----------+-------------+---------------+---------------+---------------------+-----------------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY|    CREDIT_TYPE|CREDIT_ACTIVE_NUMERIC|CREDIT_CURRENCY_NUMERIC|
+----------+-------------+---------------+---------------+---------------------+-----------------------+
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Closed|     currency 1|Consumer credit|                  0.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100001|       Active|     currency 1|Consumer credit|                  1.0|                    0.0|
|    100002|       Active|     currency 2|Consumer credit|                  1.0|                    1.0|
+----------+-------------+---------------+---------------+---------------------+-----------------------+

从此刻开始,您可以对新创建的数字列使用一位有效编码。我个人建议选择路线1,因为它更易读。但是,路线2允许您将 OneHotEncoderEstimator 链接到声明的 Pipeline 中,使代码在声明后一行可执行。希望这有所帮助。


@napoleon-borntoparty,感谢您的时间和回答。我可以说,在这两种方法中,我们仍然需要运行您的代码与数据集中分类特征的数量相同次数,这并没有解决我所问的问题。 - Joe
@Joe,我明白了,你想要串联你的分类数据。是的,这很合理,那么你可以采用第二种方式,并且使用多个StringIndexer进行“管道”处理,然后为['CREDIT_ACTIVE', 'CREDIT_CURRENCY', '...']中包含的所有分类变量加上OneHotEncoderEstimator - Napoleon Borntoparty
谢谢,@napoleon-borntoparty。我明白了,但它没有对列进行one-hot编码。我的意思是,它没有输出我期望的结果(请参见我的帖子)。 - Joe

0

在SparkML中定义的OHE一次只能处理一列数据,这可能不太优化。您可以自己实现多列OHE。您实际上已经在正确的轨道上了。

import pyspark.sql.functions as F

# let's define some data
l = [('a', 1), ('b', 2), ('c', 1), ('a', 1)]
df = spark.createDataFrame(l, ['c1', 'c2'])
# the list of column we want to encode
cols = ['c1', 'c2']

# defining a struct that associates each column name to its value
col_struct = [
  F.struct(F.lit(c).alias('key'),
           F.col(c).cast('string').alias('value')) for c in cols
]

# Then we explode these struct, group by column name and collect the
# distinct values. Finally, we collect everything to the driver.
ohe_rows = df.distinct()\
  .select(*cols).select(F.explode(F.array(*col_struct)).alias("x"))\
  .groupBy("x.key")\
  .agg(F.collect_set(F.col("x.value")).alias("values"))\
  .collect()

# then we build one spark column per column and per value of that column
# so as to encode the values
ohe = [
          [
              F.when(F.col(row['key']) == value, 1)
               .otherwise(0)
               .alias(row['key']+'_'+value) for value in row['values']
          ] for row in ohe_rows
      ]

# ohe is a list of lists so we use itertools to flatten it
import itertools
ohe_list = list(itertools.chain(*ohe))

# and voila
df.select(* [df.c1, df.c2] + ohe_list).show()
+---+---+----+----+----+----+----+
| c1| c2|c1_c|c1_b|c1_a|c2_1|c2_2|
+---+---+----+----+----+----+----+
|  a|  1|   0|   0|   1|   1|   0|
|  b|  2|   0|   1|   0|   0|   1|
|  c|  1|   1|   0|   0|   1|   0|
|  a|  1|   0|   0|   1|   1|   0|
+---+---+----+----+----+----+----+
# or simply df.select(*ohe_list)

您可以将OneHotEncoderEstimator链接到Pipeline中,就像我下面使用StringIndexer一样。 - Napoleon Borntoparty
没错,但问题是关于如何并行计算,我不认为Spark在链接转换时进行了优化。 - Oli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接