顺序编程和并行编程解决方案的区别

5
我创建了一个 Python 代码,用于解决组 Lasso 惩罚线性模型。对于那些不习惯使用这些模型的人来说,基本思想是,您将数据集(x)和响应变量(y)作为输入,以及参数值(lambda1),改变该参数值会改变模型的解。因此,我决定使用 multiprocessing 库并解决不同的模型(与不同的参数值相关联)。我创建了一个名为“model.py”的 python 文件,并包含以下函数:
# -*- coding: utf-8 -*-
from __future__ import division
import functools
import multiprocessing as mp
import numpy as np
from cvxpy import *

def lm_gl_preprocessing(x, y, index, lambda1=None):
    lambda_vector = [lambda1]
    m = x.shape[1]
    n = x.shape[0]
    lambda_param = Parameter(sign="positive")
    m = m+1
    index = np.append(0, index)
    x = np.c_[np.ones(n), x]
    group_sizes = []
    beta_var = []
    unique_index = np.unique(index)
    for idx in unique_index:
        group_sizes.append(len(np.where(index == idx)[0]))
        beta_var.append(Variable(len(np.where(index == idx)[0])))
    num_groups = len(group_sizes)
    group_lasso_penalization = 0
    model_prediction = x[:, np.where(index == unique_index[0])[0]] * beta_var[0]
    for i in range(1, num_groups):
        model_prediction += x[:, np.where(index == unique_index[i])[0]] * beta_var[i]
        group_lasso_penalization += sqrt(group_sizes[i]) * norm(beta_var[i], 2)
    lm_penalization = (1.0/n) * sum_squares(y - model_prediction)
    objective = Minimize(lm_penalization + (lambda_param * group_lasso_penalization))
    problem = Problem(objective)
    response = {'problem': problem, 'beta_var': beta_var, 'lambda_param': lambda_param, 'lambda_vector': lambda_vector}
    return response

def solver(problem, beta_var, lambda_param, lambda_vector):
    beta_sol_list = []
    for i in range(len(lambda_vector)):
        lambda_param.value = lambda_vector[i]
        problem.solve(solver=ECOS)
        beta_sol = np.asarray(np.row_stack([b.value for b in beta_var])).flatten()
        beta_sol_list.append(beta_sol)
    return beta_sol_list

def parallel_solver(problem, beta_var, lambda_param, lambda_vector):
    # Divide parameter vector into chunks to be executed in parallel
    num_chunks = mp.cpu_count()
    chunks = np.array_split(lambda_vector, num_chunks)
    # Solve problem in parallel
    pool = mp.Pool(num_chunks)
    global_results = pool.map(functools.partial(solver, problem, beta_var, lambda_param), chunks)
    pool.close()
    pool.join()
    return global_results
  • 函数lm_gl_preprocessing基本上使用cvxpy模块定义要解决的模型。
  • 函数solver从前一个函数中获取模型详细信息,并解决导致模型最终解决方案的优化问题。
  • 函数parallel_solver使用多进程对求解器函数进行并行化处理。

如果在Python控制台中开始运行并行求解器,则会得到一个解决方案。该解决方案与顺序求解器提供的解决方案不同。 如果我重新启动Python控制台并开始运行顺序求解器,然后运行并行求解器,则并行求解器会给出与顺序求解器相同的解决方案。我将展示:

from __future__ import division
from sklearn.datasets import load_boston
import numpy as np
import model as t

boston = load_boston()
x = boston.data
y = boston.target
index = np.array([1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5])

lambda1 = 1e-3

r1 = t.lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
s_parallel_1 = t.parallel_solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
print(s_parallel_1)
[[array([  4.61648376e+01,  -1.22394832e-04,   0.00000000e+00,
       0.00000000e+00,   1.37065733e-04,   1.51910696e-03,
       0.00000000e+00,   1.51910696e-03,   0.00000000e+00,
       7.00079603e-03,   1.52776114e-03,  -8.67357376e-01,
       7.16429750e-03,  -8.67357376e-01])], [], [], []]
s_1 = t.solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
print(s_1)
[array([  3.62813738e+01,  -1.06995338e-01,   4.64210526e-02,
      1.97112192e-02,   2.68475527e+00,  -1.75142155e+01,
      3.80741843e+00,   5.14842823e-04,  -1.47105323e+00,
      3.04949407e-01,  -1.23508259e-02,  -9.50143293e-01,
      9.40708993e-03,  -5.25758097e-01])]
#####################################################
r1 = t.lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
s_1 = t.solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
print(s_1)
[array([  3.62813738e+01,  -1.06995338e-01,   4.64210526e-02,
      1.97112192e-02,   2.68475527e+00,  -1.75142155e+01,
      3.80741843e+00,   5.14842823e-04,  -1.47105323e+00,
      3.04949407e-01,  -1.23508259e-02,  -9.50143293e-01,
      9.40708993e-03,  -5.25758097e-01])]
s_parallel_1 = t.parallel_solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
print(s_parallel_1)
[[array([  3.62813738e+01,  -1.06995338e-01,   4.64210526e-02,
       1.97112192e-02,   2.68475527e+00,  -1.75142155e+01,
       3.80741843e+00,   5.14842823e-04,  -1.47105323e+00,
       3.04949407e-01,  -1.23508259e-02,  -9.50143293e-01,
       9.40708993e-03,  -5.25758097e-01])], [], [], []]

PS:我知道在这个例子中,我只是使用并行编程来解决一个可能的参数值的模型,但这只是一个小例子,旨在展示顺序和并行编程所提供的解决方案的差异。由于我在这里完全迷失了,请提供任何提示。


parallel_solver的输出中,我看到除了一个进程外,所有进程都返回空列表。因此,我猜只有一个进程实际上在执行任务。不知道模型代码的情况下很难回答。我猜测当您调用solver时,一些参数(例如problem)会被修改。因此,如果您在solver之后调用parallel_solver,则会传递修改后的参数,因此结果会有所不同。 - Amedeo
1
看到你的测试结果,我也进行了复现并得到了正确的结果。请尝试运行我的代码并检查结果。如果结果不同,我唯一的猜测是你使用了一些过时的库,可能更新可以解决问题。例如,为了运行你的代码,我不得不将cvxpy降级到v0.4。最新版本(1.0)对于“Parameter()”有一个不同的参数(nonneg=True而不是sign="positive")。 - Amedeo
@Amedeo 我只是好奇,你是在 Linux 还是 Windows 上使用 Python?我正在尝试在 Windows 机器上升级 Miniconda 中的 cvxpy,但它只显示 0.4 版本。 - Álvaro Méndez Civieta
1
我正在Linux上运行它。我已经更新了我的帖子,包括环境信息。我还会在Windows上进行测试以进行双重检查。 - Amedeo
1
使用Windows+Anaconda+cvxpy0.4时,我也看到了错误的输出。在Anaconda中,我可以升级到cvxpy1.0,但这需要对代码进行一些更改。 - Amedeo
显示剩余2条评论
1个回答

1
如果我执行你的代码,所有情况下都会得到相同的结果。这是我正在运行的代码(我合并了两个文件):
from __future__ import division
import functools
import multiprocessing as mp
import numpy as np
from cvxpy import *
from sklearn.datasets import load_boston

def lm_gl_preprocessing(x, y, index, lambda1=None):
    lambda_vector = [lambda1]
    m = x.shape[1]
    n = x.shape[0]
    lambda_param = Parameter(sign="positive")
    m = m+1
    index = np.append(0, index)
    x = np.c_[np.ones(n), x]
    group_sizes = []
    beta_var = []
    unique_index = np.unique(index)
    for idx in unique_index:
        group_sizes.append(len(np.where(index == idx)[0]))
        beta_var.append(Variable(len(np.where(index == idx)[0])))
    num_groups = len(group_sizes)
    group_lasso_penalization = 0
    model_prediction = x[:, np.where(index == unique_index[0])[0]] * beta_var[0]
    for i in range(1, num_groups):
        model_prediction += x[:, np.where(index == unique_index[i])[0]] * beta_var[i]
        group_lasso_penalization += sqrt(group_sizes[i]) * norm(beta_var[i], 2)
    lm_penalization = (1.0/n) * sum_squares(y - model_prediction)
    objective = Minimize(lm_penalization + (lambda_param * group_lasso_penalization))
    problem = Problem(objective)
    response = {'problem': problem, 'beta_var': beta_var, 'lambda_param': lambda_param, 'lambda_vector': lambda_vector}
    return response

def solver(problem, beta_var, lambda_param, lambda_vector):
    beta_sol_list = []
    for i in range(len(lambda_vector)):
        lambda_param.value = lambda_vector[i]
        problem.solve(solver=ECOS)
        beta_sol = np.asarray(np.row_stack([b.value for b in beta_var])).flatten()
        beta_sol_list.append(beta_sol)
    return beta_sol_list

def parallel_solver(problem, beta_var, lambda_param, lambda_vector):
    # Divide parameter vector into chunks to be executed in parallel
    num_chunks = mp.cpu_count()
    chunks = np.array_split(lambda_vector, num_chunks)
    # Solve problem in parallel
    pool = mp.Pool(num_chunks)
    global_results = pool.map(functools.partial(solver, problem, beta_var, lambda_param), chunks)
    pool.close()
    pool.join()
    return global_results

if __name__ == "__main__":
     boston = load_boston()
     x = boston.data
     y = boston.target
     index = np.array([1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5])

     lambda1 = 1e-3

     r1 = lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
     s_parallel_1 = parallel_solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
     print(s_parallel_1)
     r1 = lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
     s_1 = solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
     print(s_1)
     print ("#####################################################")
     r1 = lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
     s_1 = solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
     print(s_1)
     r1 = lm_gl_preprocessing(x=x, y=y, index=index, lambda1=lambda1)
     s_parallel_1 = parallel_solver(problem=r1['problem'], beta_var=r1['beta_var'], lambda_param=r1['lambda_param'], lambda_vector=r1['lambda_vector'])
     print(s_parallel_1)

和输出:

[[array([ 3.62813738e+01, -1.06995338e-01,  4.64210526e-02,  1.97112192e-02,
        2.68475527e+00, -1.75142155e+01,  3.80741843e+00,  5.14842823e-04,
       -1.47105323e+00,  3.04949407e-01, -1.23508259e-02, -9.50143293e-01,
        9.40708993e-03, -5.25758097e-01])], [], [], []]
[array([ 3.62813738e+01, -1.06995338e-01,  4.64210526e-02,  1.97112192e-02,
        2.68475527e+00, -1.75142155e+01,  3.80741843e+00,  5.14842823e-04,
       -1.47105323e+00,  3.04949407e-01, -1.23508259e-02, -9.50143293e-01,
        9.40708993e-03, -5.25758097e-01])]
#####################################################
[array([ 3.62813738e+01, -1.06995338e-01,  4.64210526e-02,  1.97112192e-02,
        2.68475527e+00, -1.75142155e+01,  3.80741843e+00,  5.14842823e-04,
       -1.47105323e+00,  3.04949407e-01, -1.23508259e-02, -9.50143293e-01,
        9.40708993e-03, -5.25758097e-01])]
[[array([ 3.62813738e+01, -1.06995338e-01,  4.64210526e-02,  1.97112192e-02,
        2.68475527e+00, -1.75142155e+01,  3.80741843e+00,  5.14842823e-04,
       -1.47105323e+00,  3.04949407e-01, -1.23508259e-02, -9.50143293e-01,
        9.40708993e-03, -5.25758097e-01])], [], [], []]

如您所见,我有相同数量的CPU(4)。

我的环境是在Linux上的Python2.7,并且这些是相关软件包的版本:

>>> import sklearn
>>> sklearn.__version__
'0.19.2'
>>> import scipy
>>> scipy.__version__
'1.1.0'
>>> import numpy 
>>> numpy.__version__
'1.15.2'
>>> import cvxpy
>>> cvxpy.__version__
'0.4.0'
>>> import multiprocessing
>>> multiprocessing.__version__
'0.70a1'

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接