如何在PySpark中创建自定义评估器

24

我正在尝试在PySpark MLlib中构建简单的自定义Estimator。我已经在这里找到了编写自定义Transformer的方法,但是我不确定如何在Estimator上执行它。我也不明白@keyword_only的作用以及为什么需要这么多的setter和getter。Scikit-learn似乎有一个适合自定义模型的文档(参见此处),但PySpark没有。

以下是示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?
2个回答

28

通常情况下,没有文档,因为就Spark 1.6 / 2.0而言,大部分相关API并不意味着是公开的。这应该会在Spark 2.1.0中改变(参见SPARK-7146)。

API相对复杂,因为它必须遵循特定的约定,以使给定的TransformerEstimatorPipeline API兼容。其中一些方法可能需要用于功能,例如读取和写入或网格搜索。类似keyword_only的其他方法只是简单的帮助程序而不是严格要求的。

假设您已经为平均参数定义了以下混合:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准差参数:

class HasStandardDeviation(Params):

    standardDeviation = Param(Params._dummy(),
        "standardDeviation", "standardDeviation", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(standardDeviation=value)

    def getStddev(self):
        return self.getOrDefault(self.standardDeviation)

阈值:

class HasCenteredThreshold(Params):

    centeredThreshold = Param(Params._dummy(),
            "centeredThreshold", "centeredThreshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centeredThreshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centeredThreshold)

您可以按照以下方式创建基本的Estimator

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        super(NormalDeviation, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        
        
    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return NormalDeviationModel(
            inputCol=c, mean=mu, standardDeviation=sigma, 
            centeredThreshold=self.getCenteredThreshold(),
            predictionCol=self.getPredictionCol())


class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        super(NormalDeviationModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)    

感谢Benjamin-Manns提供的DefaultParamsReadable, DefaultParamsWritable的用法,这在PySpark版本>=2.3.0可用。

最后,可以按以下方式使用:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

谢谢!那么,估计器的状态也被视为一个参数吗? - Hanan Shteingart
你是指将估计器的调整参数作为模型的参数吗?如果是这样,这种设计方式很方便,但对于基本实现来说并不是硬性要求。 - zero323
好的,有没有关于如何持久化这样的自定义步骤的建议呢? - Evan Zamir
1
这是一个非常有用的例子。但是,如果您的transformer/model具有特定于其而不是估计器的参数,该怎么办?当它是管道中的一个阶段时,如何将这些参数传递给模型?我不想首先将这些参数传递到估计器中,因为它们与估计器无关。我在这里提出了这个问题... - snark
1
谢谢@zero323 - 这个有更新吗?我讨厌这种语法,需要从每个参数继承(Estimator不是一个参数,所以不应该继承它...) - Hanan Shteingart
1
@HananShteingart EstimatorTransform以及其他构造(如Evaluators)都是Params(issubclass(Estimator,Params) - 注意Param是相关但不同的实体的复数形式),任何内置的Estimator都是Params的子类。这是Scala API的直接翻译,其遵循相同的结构(请参见已知子类)。这与MLRead [er | able] / MLWriter [er | able]接口以及Pipeline API的设计直接相关(主要是设置器和获取器)。 - 10465355

0

我不同意@Shteingarts的解决方案,因为他在类级别上创建成员甚至将它们与实例混合在一起。如果您创建多个HasMean实例,将会导致问题。为什么不使用我认为正确的实例变量方法呢?其他代码示例也是如此。

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):
    def __init__(self):
        super(HasMean, self).__init__()
        self.mean = Param(self, "mean", "mean", typeConverter=TypeConverters.toFloat)

    def setMean(self, value):
        return self.set(self.mean, value)

    def getMean(self):
        return self.getOrDefault(self.mean)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接