在当前进程完成引导阶段之前,尝试启动新进程。

33

我是dask的新手,我发现有一个模块可以轻松实现并行化很棒。我正在一个项目中工作,在这个项目中,我能够在单台机器上并行化循环,如你可以在这里看到。但是,我想转移到dask.distributed。我对上面的类应用了以下更改:

diff --git a/mlchem/fingerprints/gaussian.py b/mlchem/fingerprints/gaussian.py
index ce6a72b..89f8638 100644
--- a/mlchem/fingerprints/gaussian.py
+++ b/mlchem/fingerprints/gaussian.py
@@ -6,7 +6,7 @@ from sklearn.externals import joblib
 from .cutoff import Cosine
 from collections import OrderedDict
 import dask
-import dask.multiprocessing
+from dask.distributed import Client
 import time


@@ -141,13 +141,14 @@ class Gaussian(object):
         for image in images.items():
             computations.append(self.fingerprints_per_image(image))

+        client = Client()
         if self.scaler is None:
-            feature_space = dask.compute(*computations, scheduler='processes',
+            feature_space = dask.compute(*computations, scheduler='distributed',
                                          num_workers=self.cores)
             feature_space = OrderedDict(feature_space)
         else:
             stacked_features = dask.compute(*computations,
-                                            scheduler='processes',
+                                            scheduler='distributed',
                                             num_workers=self.cores)

             stacked_features = numpy.array(stacked_features)

这样做会引发以下错误:

 File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

我已经尝试了不同的方法来添加if __name__ == '__main__':,但都没有成功。您可以通过运行此示例进行复现。如果有人能帮助我解决这个问题,我将不胜感激。我不知道应该如何更改我的代码使其正常工作。

谢谢。

编辑:示例是cu_training.py

2个回答

33
Client 命令会启动新的进程,因此必须放在 if __name__ == '__main__': 块中,就像这个SO问题或这个GitHub问题所描述的一样。
这与 multiprocessing 模块相同。

谢谢,@MRocklin。我已经阅读了你在答案中发送的链接。然而,我还没有找到一种方法来改变我的代码使其工作。 - muammar
我终于明白你的意思了,@MRocklin。我在这里修复了它 https://github.com/muammar/mlchem/commit/c4ab997ccab30c9aa3a5943cf4b205f9e04885e4 我会尝试重构我的代码,因为我不太喜欢必须编写一个函数来运行计算,但也许这只是使用分布式的预期方式。还不确定。顺便说一句,这是一个很棒的工具。 - muammar
你指向的答案以“在Windows上”开头。为什么我在Linux上遇到这个问题?当我尝试通过concurrent.futures模块进行多进程处理时,我没有遇到这个问题... - jammertheprogrammer

8

即使包括主要if __name__ == '__main__':,我的代码中仍然存在一些问题。

我使用多个Python文件和模块,并且仅有一个函数使用多进程进行一些采样操作。唯一有效的修复方法是在整个代码(包括导入)的第一个文件和第一行中包含主要内容。以下方法效果很好:

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接