我是dask的新手,我发现有一个模块可以轻松实现并行化很棒。我正在一个项目中工作,在这个项目中,我能够在单台机器上并行化循环,如你可以在这里看到。但是,我想转移到dask.distributed
。我对上面的类应用了以下更改:
diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
from .cutoff import Cosine
from collections import OrderedDict
import dask
-import dask.multiprocessing
+from dask.distributed import Client
import time
@@ -141,13 +141,14 @@ class Gaussian(object):
for image in images.items():
computations.append(self.fingerprints_per_image(image))
+ client = Client()
if self.scaler is None:
- feature_space = dask.compute(*computations, scheduler='processes',
+ feature_space = dask.compute(*computations, scheduler='distributed',
num_workers=self.cores)
feature_space = OrderedDict(feature_space)
else:
stacked_features = dask.compute(*computations,
- scheduler='processes',
+ scheduler='distributed',
num_workers=self.cores)
stacked_features = numpy.array(stacked_features)
这样做会引发以下错误:
File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
我已经尝试了不同的方法来添加if __name__ == '__main__':
,但都没有成功。您可以通过运行此示例进行复现。如果有人能帮助我解决这个问题,我将不胜感激。我不知道应该如何更改我的代码使其正常工作。
谢谢。
编辑:示例是cu_training.py
。