我最近也意识到,叠加估计器无法处理样本加权管道。我通过从scikit-learn子类化StackingRegressor
和StackingClassifier
类并重写其fit()
方法来解决这个问题,以更好地管理管道。请看以下内容:
"""Implement StackingClassifier that can handle sample-weighted Pipelines."""
from sklearn.ensemble import StackingRegressor, StackingClassifier
from copy import deepcopy
import numpy as np
from joblib import Parallel
from sklearn.base import clone
from sklearn.base import is_classifier, is_regressor
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import check_cv
from sklearn.utils import Bunch
from sklearn.utils.fixes import delayed
from sklearn.pipeline import Pipeline
ESTIMATOR_NAME_IN_PIPELINE = 'estimator'
def new_fit_single_estimator(estimator, X, y, sample_weight=None,
message_clsname=None, message=None):
"""Private function used to fit an estimator within a job."""
if sample_weight is not None:
try:
if isinstance(estimator, Pipeline):
estimator_name = estimator.steps[-1][0]
kwargs = {estimator_name + '__sample_weight': sample_weight}
estimator.fit(X, y, **kwargs)
else:
estimator.fit(X, y, sample_weight=sample_weight)
except TypeError as exc:
if "unexpected keyword argument 'sample_weight'" in str(exc):
raise TypeError(
"Underlying estimator {} does not support sample weights."
.format(estimator.__class__.__name__)
) from exc
raise
else:
estimator.fit(X, y)
return estimator
class FlexibleStackingClassifier(StackingClassifier):
def __init__(self, estimators, final_estimator=None, *, cv=None,
n_jobs=None, passthrough=False, verbose=0):
super().__init__(
estimators=estimators,
final_estimator=final_estimator,
cv=cv,
n_jobs=n_jobs,
passthrough=passthrough,
verbose=verbose
)
def fit(self, X, y, sample_weight=None):
"""Fit the estimators.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vectors, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like of shape (n_samples,) or default=None
Sample weights. If None, then samples are equally weighted.
Note that this is supported only if all underlying estimators
support sample weights.
.. versionchanged:: 0.23
when not None, `sample_weight` is passed to all underlying
estimators
Returns
-------
self : object
"""
names, all_estimators = self._validate_estimators()
self._validate_final_estimator()
stack_method = [self.stack_method] * len(all_estimators)
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
for est in all_estimators if est != 'drop'
)
self.named_estimators_ = Bunch()
est_fitted_idx = 0
for name_est, org_est in zip(names, all_estimators):
if org_est != 'drop':
self.named_estimators_[name_est] = self.estimators_[
est_fitted_idx]
est_fitted_idx += 1
else:
self.named_estimators_[name_est] = 'drop'
cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
if hasattr(cv, 'random_state') and cv.random_state is None:
cv.random_state = np.random.RandomState()
self.stack_method_ = [
self._method_name(name, est, meth)
for name, est, meth in zip(names, all_estimators, stack_method)
]
fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
if sample_weight is not None
else None)
predictions = Parallel(n_jobs=self.n_jobs)(
delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
method=meth, n_jobs=self.n_jobs,
fit_params=fit_params,
verbose=self.verbose)
for est, meth in zip(all_estimators, self.stack_method_)
if est != 'drop'
)
self.stack_method_ = [
meth for (meth, est) in zip(self.stack_method_, all_estimators)
if est != 'drop'
]
X_meta = self._concatenate_predictions(X, predictions)
new_fit_single_estimator(self.final_estimator_, X_meta, y,
sample_weight=sample_weight)
return self
class FlexibleStackingRegressor(StackingRegressor):
def __init__(self, estimators, final_estimator=None, *, cv=None,
n_jobs=None, passthrough=False, verbose=0):
super().__init__(
estimators=estimators,
final_estimator=final_estimator,
cv=cv,
n_jobs=n_jobs,
passthrough=passthrough,
verbose=verbose
)
def fit(self, X, y, sample_weight=None):
"""Fit the estimators.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vectors, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like of shape (n_samples,) or default=None
Sample weights. If None, then samples are equally weighted.
Note that this is supported only if all underlying estimators
support sample weights.
.. versionchanged:: 0.23
when not None, `sample_weight` is passed to all underlying
estimators
Returns
-------
self : object
"""
names, all_estimators = self._validate_estimators()
self._validate_final_estimator()
stack_method = [self.stack_method] * len(all_estimators)
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
for est in all_estimators if est != 'drop'
)
self.named_estimators_ = Bunch()
est_fitted_idx = 0
for name_est, org_est in zip(names, all_estimators):
if org_est != 'drop':
self.named_estimators_[name_est] = self.estimators_[
est_fitted_idx]
est_fitted_idx += 1
else:
self.named_estimators_[name_est] = 'drop'
cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
if hasattr(cv, 'random_state') and cv.random_state is None:
cv.random_state = np.random.RandomState()
self.stack_method_ = [
self._method_name(name, est, meth)
for name, est, meth in zip(names, all_estimators, stack_method)
]
fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
if sample_weight is not None
else None)
predictions = Parallel(n_jobs=self.n_jobs)(
delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
method=meth, n_jobs=self.n_jobs,
fit_params=fit_params,
verbose=self.verbose)
for est, meth in zip(all_estimators, self.stack_method_)
if est != 'drop'
)
self.stack_method_ = [
meth for (meth, est) in zip(self.stack_method_, all_estimators)
if est != 'drop'
]
X_meta = self._concatenate_predictions(X, predictions)
new_fit_single_estimator(self.final_estimator_, X_meta, y,
sample_weight=sample_weight)
return self
我同时提供了回归器和分类器的版本,但您似乎只需要使用分类器子类。
但是请注意: 在管道中为估算器指定相同的名称,并且该名称必须与下面定义的ESTIMATOR_NAME_IN_PIPELINE
字段对齐。否则代码不会起作用。例如,这里会使用类定义脚本中定义的相同名称适当地定义一个Pipeline
实例:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import TweedieRegressor
from sklearn.feature_selection import VarianceThreshold
validly_named_pipeline = Pipeline([
('variance_threshold', VarianceThreshold()),
('scaler', StandardScaler()),
('estimator', TweedieRegressor())
])
这不是理想的情况,但现在它是我所拥有的,并且应该可以正常工作。
编辑:仅仅为了明确,当我重写fit()
方法时,我只是从Scikit社区中复制并粘贴了代码,并做出了必要的更改,其中只涉及几行代码。因此,大部分粘贴的代码不是我的原创作品,而是Scikit开发人员的。
StackingClassifier
不允许您将拟合参数传递给基学习器。StackingClassifier.fit
仅具有sample_weights
参数,但它会将这些权重传递给_每个_基学习器,这不是您要求的。 无论如何,由于您的基学习器实际上是一个管道,而管道不能直接使用sample_weights
,因此也会出现您报告的错误。 如果您希望所有基学习器都获得样本权重,则我可以想到一些解决方法,但否则,除了复制StackingClassifier.fit
逻辑之外,我看不到简单的修补程序。 - Ben Reiniger