使用Dask或Joblib进行并行Sklearn模型构建

6

我有一组大量的sklearn管道,希望能够使用Dask并行构建。以下是一个简单但幼稚的顺序方法:

from sklearn.naive_bayes import MultinomialNB 
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)

pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])

pipelines = [pipe_nb, pipe_lr, pipe_rf]  # In reality, this would include many more different types of models with varying but specific parameters

for pl in pipelines:
    pl.fit(X_train, Y_train)

请注意,这不是GridSearchCV或RandomSearchCV问题

对于RandomSearchCV,我知道如何使用Dask进行并行化:

dask_client = Client('tcp://some.host.com:8786')  

clf_rf = RandomForestClassifier()
param_dist = {'n_estimators': scipy.stats.randint(100, 500}
search_rf = RandomizedSearchCV(
                clf_rf,
                param_distributions=param_dist, 
                n_iter = 100, 
                scoring = 'f1',
                cv=10,
                error_score = 0, 
                verbose = 3,
               )

with joblib.parallel_backend('dask'):
    search_rf.fit(X_train, Y_train)

然而,我对超参数调整不感兴趣,并且不清楚如何修改此代码以便与Dask并行适配一组具有其自己特定参数的多个不同模型。

1个回答

7

dask.delayed 可能是这里最简单的解决方案。

from sklearn.naive_bayes import MultinomialNB 
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)

pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])

pipelines = [pipe_nb, pipe_lr, pipe_rf]  # In reality, this would include many more different types of models with varying but specific parameters

# Use dask.delayed instead of a for loop.
import dask.delayed

pipelines_ = [dask.delayed(pl).fit(X_train, Y_train) for pl in pipelines]
fit_pipelines = dask.compute(*pipelines_)

这个会自动检测/使用dask.distributed客户端吗?假设执行了dask_client = Client('tcp://some.host.com:8786')这一行代码。还是需要将其包装到joblib.parallel_backend('dask')中? - slaw
这只是使用Dask,而不是joblib,因此您不需要使用joblib.parallel_backend上下文管理器(尽管也不会有害)。 Dask将选择最近创建的客户端并将其用作默认值。 - MRocklin
joblib.parallel_backend 上下文中执行 dask.compute(*pipelines_) 可能会并行化单个 模型 的训练,如果您指定了 n_jobs。如果您尝试并行化单个模型和跨模型处理,我不确定 Dask 会如何处理,但事情可能会顺利进行。 - TomAugspurger
谢谢您的见解!我也学到了一些关于 dask.delayed(pl).fit(X_train, Y_train) 的新知识。它看起来不像文档中的经典 incadd 示例,所以如果没有您的帮助,我可能会遇到一些麻烦。 - slaw
@slaw,我也遇到了类似的问题,并找到了这个答案,帮助我理清了一些事情。只是想知道您是否尝试过使用相同的方法在集群上进行分布式训练,以及dask是否适合此任务。 - Subrat Sahu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接