在Julia中并行计算梯度

7

我曾经被说服放弃舒适的Matlab编程并开始使用Julia编程。我长期以来一直在研究神经网络,而我认为,现在有了Julia,通过并行计算梯度,我可以更快地完成任务。

梯度不需要一次性在整个数据集上计算;相反,我们可以将计算分割。例如,通过将数据集分成几部分,我们可以在每个部分上计算局部梯度。然后通过相加这些局部梯度来计算总的梯度。

虽然原则很简单,但当我使用Julia并行化时,会出现性能下降,即一个进程比两个进程更快!我显然做错了什么......我已经参考了论坛中其他人提出的问题,但我仍然无法得到完整的答案。我认为我的问题在于有大量不必要的数据移动,但我无法正确修复它。

为避免发布混乱的神经网络代码,我在下面发布一个更简单的示例,它在线性回归设置中复制了我的问题。

下面的代码块创建了一些用于线性回归问题的数据。代码解释了常量,但X是包含数据输入的矩阵。我们随机创建一个权重向量w,当与X相乘时,可以得到一些目标Y

######################################
## CREATE LINEAR REGRESSION PROBLEM ##
######################################

# This code implements a simple linear regression problem

MAXITER = 100   # number of iterations for simple gradient descent
N = 10000       # number of data items
D = 50          # dimension of data items
X = randn(N, D) # create random matrix of data, data items appear row-wise
Wtrue = randn(D,1) # create arbitrary weight matrix to generate targets
Y = X*Wtrue     # generate targets

下面的代码块定义了用于测量回归适应性(即负对数似然)和权重向量梯度的函数:w:
####################################
##       DEFINE FUNCTIONS         ##
####################################

@everywhere  begin

  #-------------------------------------------------------------------
  function negative_loglikelihood(Y,X,W)
  #-------------------------------------------------------------------

    # number of data items
    N  = size(X,1)
    # accumulate here log-likelihood
    ll = 0
    for nn=1:N
      ll = ll - 0.5*sum((Y[nn,:] - X[nn,:]*W).^2)
    end

    return ll
  end


  #-------------------------------------------------------------------
  function negative_loglikelihood_grad(Y,X,W, first_index,last_index)
  #-------------------------------------------------------------------

    # number of data items
    N  = size(X,1)
    # accumulate here gradient contributions by each data item
    grad = zeros(similar(W))
    for nn=first_index:last_index
      grad = grad +  X[nn,:]' * (Y[nn,:] - X[nn,:]*W)
    end

    return grad
  end


end

请注意,上述函数故意没有向量化!我选择不进行向量化,因为最终的代码(神经网络案例)也不会接受任何向量化(我们不要进一步讨论这个问题)。
最后,下面的代码块显示了一个非常简单的梯度下降,试图从给定的数据Y和X中恢复参数权重向量w:
####################################
##     SOLVE LINEAR REGRESSION    ##
####################################


# start from random initial solution
W = randn(D,1)

# learning rate, set here to some arbitrary small constant
eta = 0.000001

# the following for-loop implements simple gradient descent
for iter=1:MAXITER

  # get gradient
  ref_array = Array(RemoteRef, nworkers())

  # let each worker process part of matrix X
  for index=1:length(workers())

    # first index of subset of X that worker should work on
    first_index       = (index-1)*int(ceil(N/nworkers())) + 1
    # last index of subset of X that worker should work on
    last_index        = min((index)*(int(ceil(N/nworkers()))), N)

    ref_array[index] = @spawn negative_loglikelihood_grad(Y,X,W, first_index,last_index)
  end

  # gather the gradients calculated on parts of matrix X
  grad = zeros(similar(W))
  for index=1:length(workers())
    grad = grad + fetch(ref_array[index])
  end

  # now that we have the gradient we can update parameters W
  W = W + eta*grad;

  # report progress, monitor optimisation
  @printf("Iter %d neg_loglikel=%.4f\n",iter, negative_loglikelihood(Y,X,W))
end

希望显而易见的是,我在这里尝试以最简单的方式并行计算梯度。我的策略是将梯度的计算分成尽可能多的部分,每个工作线程都需要仅处理矩阵X的一部分,该部分由first_indexlast_index指定。因此,每个工作线程应使用X[first_index:last_index,:]进行操作。例如,对于4个工作线程和N = 10000,应按以下方式划分工作:
  • 工作线程1 => first_index = 1, last_index = 2500
  • 工作线程2 => first_index = 2501, last_index = 5000
  • 工作线程3 => first_index = 5001, last_index = 7500
  • 工作线程4 => first_index = 7501, last_index = 10000
不幸的是,如果只有一个工作线程,则整个代码运行速度更快。如果通过addprocs()添加更多工作线程,则代码的运行速度会变慢。例如,可以通过使用N=20000来创建更多数据项,从而加剧此问题。对于更多数据项,退化现象更加明显。在我的计算环境中,当N=20000并且使用一个核心时,代码运行需要约9秒。当N=20000并且使用4个核心时,需要约18秒!
我尝试了许多不同的方法,受到本论坛中的问题和答案的启发,但不幸的是没有任何效果。我意识到并行化方法很幼稚,数据传输可能是问题所在,但我不知道如何正确地解决它。似乎文档在这个问题上也有些匮乏(就像Ivo Balbaert的好书一样)。
我希望您能提供帮助,因为我已经被卡住了很长时间,并且真的需要它来完成我的工作。对于任何想要运行代码的人,为了避免复制粘贴的麻烦,您可以在此处获取代码。
感谢您花费时间阅读这个非常冗长的问题!请帮我将其转化为模型答案,以便任何Julia新手都可以查阅!
2个回答

4
我认为GD算法不适合使用任何提议的方法进行并行化:无论是SharedArray还是DistributedArray,或者自己实现数据块分配的分发方法。
问题不在于Julia语言,而是在GD算法本身。 请考虑以下代码:
主进程:
for iter = 1:iterations #iterations: "the more the better"
    δ = _gradient_descent_shared(X, y, θ) 
    θ = θ -  α * (δ/N)   
end

问题出在上面的for循环中,这是必须的。无论_gradient_descent_shared有多好,总迭代次数都会破坏并行化的崇高概念。
阅读了问题和上述建议后,我开始使用SharedArray实现GD。请注意,我不是SharedArrays领域的专家。
主进程部分(简单实现,没有正则化):
run_gradient_descent(X::SharedArray, y::SharedArray, θ::SharedArray, α, iterations) = begin
  N = length(y)

  for iter = 1:iterations 
    δ = _gradient_descent_shared(X, y, θ) 
    θ = θ -  α * (δ/N)   
  end     
  θ
end

_gradient_descent_shared(X::SharedArray, y::SharedArray, θ::SharedArray, op=(+)) = begin            
    if size(X,1) <= length(procs(X))
        return _gradient_descent_serial(X, y, θ)
    else
        rrefs = map(p -> (@spawnat p _gradient_descent_serial(X, y, θ)), procs(X))
        return mapreduce(r -> fetch(r), op, rrefs)
    end 
end

所有工作人员共用的代码:

#= Returns the range of indices of a chunk for every worker on which it can work. 
The function splits data examples (N rows into chunks), 
not the parts of the particular example (features dimensionality remains intact).=# 
@everywhere function _worker_range(S::SharedArray)
    idx = indexpids(S)
    if idx == 0
        return 1:size(S,1), 1:size(S,2)
    end
    nchunks = length(procs(S))
    splits = [round(Int, s) for s in linspace(0,size(S,1),nchunks+1)]
    splits[idx]+1:splits[idx+1], 1:size(S,2)
end

#Computations on the chunk of the all data.
@everywhere _gradient_descent_serial(X::SharedArray, y::SharedArray, θ::SharedArray)  = begin
    prange = _worker_range(X)

    pX = sdata(X[prange[1], prange[2]])
    py = sdata(y[prange[1],:])

    tempδ = pX' * (pX * sdata(θ) .- py)         
end

数据加载和训练。假设我们有以下内容:
  • X::Array (N,D)中的特征,其中N是样本数,D是特征维度
  • y::Array (N,1)中的标签
主要代码可能如下所示:
X=[ones(size(X,1)) X] #adding the artificial coordinate 
N, D = size(X)
MAXITER = 500
α = 0.01

initialθ = SharedArray(Float64, (D,1))
sX = convert(SharedArray, X)
sy = convert(SharedArray, y)  
X = nothing
y = nothing
gc() 

finalθ = run_gradient_descent(sX, sy, initialθ, α, MAXITER);

在我的Intel Core i7的8核上实现并运行后,我在训练多类(19类)数据(串行GD 715秒/共享GD 665秒)中获得了非常轻微的加速。如果我的实现是正确的(请检查一下 - 我指望你了),那么GD算法的并行化不值得这样做。肯定可以通过在1个核上使用随机GD来获得更好的加速效果。

你好Maciek,非常感谢您的帖子。我仍然为这个问题苦恼。我必须承认我不明白为什么加速GD存在问题。我的意思是,一旦数据块被分发或访问(我不知道设计选择对此有多大影响),不需要相互交互的不同工作人员应该会看到改进(至少比您和我实现的要好)。当然,分发数据需要时间,但从长远来看,这段时间变得微不足道了?我很快就会查看您的实现。干杯。 - user1438310
1
在我看来,由于减少的数量(for-loop),我们并没有加速GD。每次迭代都必须在主进程中进行确认。我已经使用大小为N = 3000,D = 6000的训练集进行了测试。也许如果N更大一些,它会更好地加速。如果您有足够的数据,请检查一下。 - Maciek Leks
嗨Maciek,谢谢你回复。我明白你所说的“每次迭代都必须在主进程中确认”的意思,我开始理解了。我不确定接下来该怎么做,但我很感激你的反馈。谢谢。 - user1438310
我终于接受了上面的答案。虽然它并没有完全解决我的问题,但是它很有帮助,也是我得到的最详尽的回复。 - user1438310

4

如果你想减少数据的移动量,强烈建议使用SharedArrays。你可以预先分配一个输出向量,并将其作为参数传递给每个工作进程。每个工作进程设置其中的一块,正如你所建议的那样。


谢谢你的回答!你介意详细解释一下如何实际使用SharedArrays,或者给我指一个容易理解的例子吗?例如,我甚至无法弄清楚如何将我的现有数据矩阵X转换为SharedArray,或者直接将我的数据加载到SharedArray中。Julia文档展示了构造函数是什么,但这对我没有帮助。谢谢。 - user1438310
我在另一篇帖子中看到你的回答,建议将现有数组转换为SharedArray,如下所示:sharedx = convert(SharedArray,x)。确实非常有用。 - user1438310
我正在尝试使用SharedArray类型,但我不确定如何使用它来将原始矩阵X的块分配给工作进程。我想我需要做更多的事情,而不仅仅是sharedX = convert(SharedArray,X),然后传递sharedX,对吗? - user1438310

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接