PySpark特征向量允许空值

4
我希望在一个包含NULL值的数据集上使用PySpark中的分类器。这些NULL值出现在我创建的特征中,例如成功百分比。我需要保留NULL值,因为我已经通过pandas证明保留NULL值会得到更强的模型。因此,我不想用零或中位数填充NULL值。
我知道可以使用Vector Assembler创建特征向量,但当数据包含NULL值时它无法工作。我想知道是否有一种方法可以创建包含NULL值的特征向量,并且可以与LightGBMClassifier一起使用?
我使用diamonds.csv数据演示了我遇到的问题。我使用一个干净未编辑的副本和一个我插入了null的副本来演示我遇到的问题。
import pandas as pd
import numpy as np
import random

from mmlspark import LightGBMClassifier
from pyspark.sql.functions import * 
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator

diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:] 
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)

diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
 +-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259| 
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407| 
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698| 
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433| 
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205| 
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059| 
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252| 
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583| 
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808| 
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|

然后配置用于管道的阶段。
categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

管道适用于钻石数据清洗,数据被转换,如预期返回标签列和特征向量。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
+-----+--------------------+ 
|price| features| 
+-----+--------------------+ 
| 326|(23,[0,5,12,17,18...| 
| 326|(23,[1,5,10,17,18...| 
| 327|(23,[3,5,13,17,18...| 
| 334|(23,[1,9,11,17,18...| 
| 335|(23,[3,12,17,18,1...| 
| 336|(23,[2,14,17,18,1...| 
+-----+--------------------+

然而,当尝试在diamondsData_nulls数据框上执行相同的步骤时,会返回一个错误。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct&lt;cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double&gt;) =&gt; vector)

这是一个已知的问题,正在处理中(https://github.com/Azure/mmlspark/issues/304),但我目前找不到允许通过NULL的特征提取器。
尝试使用handleInvalid = "keep"参数
用户Machiel建议在StringIndexer和OneHotEncoderEstimator函数中使用handleInvalid参数 - 结果还应该应用于VectorAssembler函数。我根据如下更新了我的代码:
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]
1个回答

2
对于字符串和分类数字,您可以使用handleInvalid参数让Spark为缺失值创建一个桶:Original Answer(最初的回答)
OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')

嗨Machiel,感谢您的建议。我之前不知道这个参数,而您对它的描述听起来非常完美。因此我给您点赞。然而,我刚刚尝试更新我的代码以包含它,但仍然返回相同的错误。我将更新我的问题并展示我已经尝试过这个方法。 - Spainey
1
再次你好,事实证明VectorAssembler也有handleInvalid参数,将其包含在其中解决了我的问题。非常感谢您的帮助! :) - Spainey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接