通常情况下,没有文档,因为就Spark 1.6 / 2.0而言,大部分相关API并不意味着是公开的。这应该会在Spark 2.1.0中改变(参见SPARK-7146)。
API相对复杂,因为它必须遵循特定的约定,以使给定的Transformer
或Estimator
与Pipeline
API兼容。其中一些方法可能需要用于功能,例如读取和写入或网格搜索。类似keyword_only
的其他方法只是简单的帮助程序而不是严格要求的。
假设您已经为平均参数定义了以下混合:
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp
class HasMean(Params):
mean = Param(Params._dummy(), "mean", "mean",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasMean, self).__init__()
def setMean(self, value):
return self._set(mean=value)
def getMean(self):
return self.getOrDefault(self.mean)
标准差参数:
class HasStandardDeviation(Params):
standardDeviation = Param(Params._dummy(),
"standardDeviation", "standardDeviation",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasStandardDeviation, self).__init__()
def setStddev(self, value):
return self._set(standardDeviation=value)
def getStddev(self):
return self.getOrDefault(self.standardDeviation)
阈值:
class HasCenteredThreshold(Params):
centeredThreshold = Param(Params._dummy(),
"centeredThreshold", "centeredThreshold",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasCenteredThreshold, self).__init__()
def setCenteredThreshold(self, value):
return self._set(centeredThreshold=value)
def getCenteredThreshold(self):
return self.getOrDefault(self.centeredThreshold)
您可以按照以下方式创建基本的Estimator
:
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark import keyword_only
class NormalDeviation(Estimator, HasInputCol,
HasPredictionCol, HasCenteredThreshold,
DefaultParamsReadable, DefaultParamsWritable):
@keyword_only
def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
super(NormalDeviation, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
def setPredictionCol(self, value):
"""
Sets the value of :py:attr:`predictionCol`.
"""
return self._set(predictionCol=value)
@keyword_only
def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _fit(self, dataset):
c = self.getInputCol()
mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
return NormalDeviationModel(
inputCol=c, mean=mu, standardDeviation=sigma,
centeredThreshold=self.getCenteredThreshold(),
predictionCol=self.getPredictionCol())
class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
HasMean, HasStandardDeviation, HasCenteredThreshold,
DefaultParamsReadable, DefaultParamsWritable):
@keyword_only
def __init__(self, inputCol=None, predictionCol=None,
mean=None, standardDeviation=None,
centeredThreshold=None):
super(NormalDeviationModel, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, predictionCol=None,
mean=None, standardDeviation=None,
centeredThreshold=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
x = self.getInputCol()
y = self.getPredictionCol()
threshold = self.getCenteredThreshold()
mu = self.getMean()
sigma = self.getStddev()
return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)
感谢Benjamin-Manns提供的DefaultParamsReadable, DefaultParamsWritable的用法,这在PySpark版本>=2.3.0可用。
最后,可以按以下方式使用:
df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])
normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model = Pipeline(stages=[normal_deviation]).fit(df)
model.transform(df).show()
Estimator
和Transform
以及其他构造(如Evaluators)都是Params(issubclass(Estimator,Params)
- 注意Param是相关但不同的实体的复数形式),任何内置的Estimator都是Params的子类。这是Scala API的直接翻译,其遵循相同的结构(请参见已知子类)。这与MLRead [er | able]
/MLWriter [er | able]
接口以及Pipeline API的设计直接相关(主要是设置器和获取器)。 - 10465355