通过concurrent.futures多进程填充numpy数组

10
我想使用多进程填充一个大的numpy数组。我已经研究了文档中的concurrent futures示例,但还没有足够的理解来修改用法。
以下是我所需做的简化版本:
import numpy
import concurrent.futures

squares = numpy.empty((20, 2))

def make_square(i, squares):
    print('iteration', i)
    squares[i, 0], squares[i, 1] = i, i ** 2

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for i in range(20):
        executor.submit(make_square, i, squares)

输出结果如下:
iteration 1
iteration 0
iteration 2
iteration 3
iteration 5
iteration 4
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 15
iteration 14
iteration 16
iteration 17
iteration 18
iteration 19

这个很好地展示了函数正在并发运行。但是squares数组仍然为空。

如何正确填充squares数组的语法是什么?

其次,使用.map是否是更好的实现方式?

提前致谢!

2017年8月2日 哇。因为我没有得到任何人解决这个问题,所以我进入了reddit-land。很高兴回到stackoverflow。感谢@ilia w495 nikitin和@donkopotamus。以下是我在reddit上发布的更详细解释这个问题背景的内容。

The posted code is an analogy of what I'm trying to do, which is populating 
a numpy array with a relatively simple calculation (dot product) involving 
two other arrays. The algorithm depends on a value N which can be anything 
from 1 on up, though we won't likely use a value larger than 24.

I'm currently running the algorithm on a distributed computing system and  
the N = 20 versions take longer than 10 days to complete. I'm using dozens 
of cores to obtain the required memory, but gaining none of the benefits of 
multiple CPUs. I've rewritten the code using numba which makes lower N 
variants superfast on my own laptop which can't handle the memory 
requirements for larger Ns, but alas, our distributed computing environment 
is not currently able to install numba. So I'm attempting concurrent.futures 
to take advantage of the multiple CPUs in our computing environment in the 
hopes of speeding things up.

因此,耗时的不是计算本身,而是1600万次以上的迭代。在上述代码中,初始化的数组为N x 2 ** N,即range(16777216)。

也许通过多进程无法填充数组。


你的squares数组为空,因为你试图在独立的进程中修改它。 - donkopotamus
@zazizoma 不是填充而是初始化。还有另一种范式。共享数据结构应该是不可变的。 我认为你应该将数组分成 C 部分,其中 C 是 CPU 的数量,并在分离的 CPU(进程)上处理每个部分。 然后,连接所有部分,你就会得到想要的结果。 但在某些情况下,这是不适用的。这取决于你的算法。 此外,进程间的数据传输也有其自身的成本。例如,我将尝试使用 pymp 实现: https://gist.github.com/w495/6d3cd6a715e3098a3a10a0479d9fbb03使用 concurrent.futures 将更容易。 - Ilia w495 Nikitin
太好了。我还会考虑在多个CPU上运行点积,并保持迭代线性。这可能会有所帮助。非常感谢您的指导。 - zazizoma
@zazizoma 这是我项目的另一部分,我使用 concurrent.futures.ProcessPoolExecutor 实现了分区:https://gist.github.com/w495/82f7b21509a69a0d70e18f2e4ddf5ed9 我认为这也可能对你有所帮助。 - Ilia w495 Nikitin
2个回答

7
问题在于ProcessPoolExecutor会在独立的进程中执行函数。

由于这些是独立的进程,具有独立的内存空间,因此你不能指望它们对数组(squares)所做的任何更改都会在父进程中反映出来。因此,你原始的数组保持不变(就像你发现的那样)。

你需要执行以下操作之一:

  • 使用ThreadPoolExecutor,但请注意,在一般情况下,你仍然不应尝试修改多个线程中的全局变量
  • 重新设计代码,使你的进程/线程进行某种(昂贵的)计算并返回结果

后者的代码应该如下所示:

squares = numpy.zeros((20, 2))

def make_square(i):
    print('iteration', i)

    # compute expensive data here ...

    # return row number and the computed data
    return i, ([i, i**2])

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for row, result in executor.map(make_square, range(20)):
        squares[row] = result

这将产生您期望的结果:
[[   0.    0.]
 [   1.    1.]
 [   2.    4.]
 ...
 [  18.  324.]
 [  19.  361.]]

但是重用这个(squares)变量的原因是什么?它并不能保证更低的内存利用率。而且,将不必要的数据通过进程重新发送也不是一个好主意。在这种情况下,您可以通过使用enumerate来获取“row”。 - Ilia w495 Nikitin

-3

这是一个好的例子,我想它会对你有所帮助:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
print("Result: " + future.result())

未来——只是承诺去做某事。所以我看待你的代码就像这样:

import concurrent.futures
import itertools
import os
import time

import numpy

SQUARE_LIST_SIZE = 20


def main():
    # Creates empty array.
    square_list = numpy.empty((SQUARE_LIST_SIZE, 2))

    # Creates a sequence (generator) of promises
    future_seq = make_future_seq(square_list)

    # Creates a sequence (generator) of computed square.
    square_seq = make_square_seq(future_seq)

    # Creates a sequence (generator) of computed square.
    square_list = list(square_seq)

    return square_list


def make_future_seq(squares):
    """
        Generates the sequence of empty a promises.
        Creates a new process only on `submit`.
    """

    with concurrent.futures.ProcessPoolExecutor(4) as executor:
        for i in range(SQUARE_LIST_SIZE):
            # Only makes a promise to do something.
            future = executor.submit(make_one_square, i, squares)
            print('future ', i, '= >', future)
            yield future


def make_square_seq(future_seq):
    """
        Generates the sequence of fulfilled a promises.
    """

    # Just to copy iterator
    for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3)

    # Let's check it, May be it withdrawn =)
    for i, future in enumerate(for_show_1):
        print('future ', i, 'done [1] =>', future.done())

    # Try to keep its promises
    for future in future_seq:
        yield future.result()

    # Let's check it one more time. It is faithful to!
    for i, future in enumerate(for_show_2):
        print('future ', i, 'done [2] =>', future.done())

    return future_seq


def make_one_square(i, squares):
    print('inside [1] = >', i, 'pid = ', os.getpid())
    squares[i, 0], squares[i, 1] = i, i ** 2

    time.sleep(1)  # Long and hard computation.

    print('inside [2]= >', i, 'pid = ', os.getpid())
    return squares


if __name__ == '__main__':
    main()

字母太多了,这只是为了解释。

具体情况而定,但很多真实的例子只需要调用future.result()

请查看此页面:concurrent.futures.html

因此,此代码将生成类似于以下内容:

$ python test_futures_1.py 
future  0 = > <Future at 0x7fc0dc758278 state=running>
future  0 done [1] => False
future  1 = > <Future at 0x7fc0dc758da0 state=pending>
inside [1] = > 0 pid =  19364
future  1 done [1] => False
inside [1] = > 1 pid =  19365
future  2 = > <Future at 0x7fc0dc758e10 state=pending>
future  2 done [1] => False
future  3 = > <Future at 0x7fc0dc758cc0 state=pending>
inside [1] = > 2 pid =  19366
future  3 done [1] => False
future  4 = > <Future at 0x7fc0dc769048 state=pending>
future  4 done [1] => False
inside [1] = > 3 pid =  19367
future  5 = > <Future at 0x7fc0dc758f60 state=running>
future  5 done [1] => False
future  6 = > <Future at 0x7fc0dc758fd0 state=pending>
future  6 done [1] => False
future  7 = > <Future at 0x7fc0dc7691d0 state=pending>
future  7 done [1] => False
future  8 = > <Future at 0x7fc0dc769198 state=pending>
future  8 done [1] => False
future  9 = > <Future at 0x7fc0dc7690f0 state=pending>
future  9 done [1] => False
future  10 = > <Future at 0x7fc0dc769438 state=pending>
future  10 done [1] => False
future  11 = > <Future at 0x7fc0dc7694a8 state=pending>
future  11 done [1] => False
future  12 = > <Future at 0x7fc0dc769550 state=pending>
future  12 done [1] => False
future  13 = > <Future at 0x7fc0dc7695f8 state=pending>
future  13 done [1] => False
future  14 = > <Future at 0x7fc0dc7696a0 state=pending>
future  14 done [1] => False
future  15 = > <Future at 0x7fc0dc769748 state=pending>
future  15 done [1] => False
future  16 = > <Future at 0x7fc0dc7697f0 state=pending>
future  16 done [1] => False
future  17 = > <Future at 0x7fc0dc769898 state=pending>
future  17 done [1] => False
future  18 = > <Future at 0x7fc0dc769940 state=pending>
future  18 done [1] => False
future  19 = > <Future at 0x7fc0dc7699e8 state=pending>
future  19 done [1] => False
inside [2]= > 0 pid =  19364
inside [2]= > 1 pid =  19365
inside [1] = > 4 pid =  19364
inside [2]= > 2 pid =  19366
inside [1] = > 5 pid =  19365
inside [1] = > 6 pid =  19366
inside [2]= > 3 pid =  19367
inside [1] = > 7 pid =  19367
inside [2]= > 4 pid =  19364
inside [2]= > 5 pid =  19365
inside [2]= > 6 pid =  19366
inside [1] = > 8 pid =  19364
inside [1] = > 9 pid =  19365
inside [1] = > 10 pid =  19366
inside [2]= > 7 pid =  19367
inside [1] = > 11 pid =  19367
inside [2]= > 8 pid =  19364
inside [2]= > 9 pid =  19365
inside [2]= > 10 pid =  19366
inside [2]= > 11 pid =  19367
inside [1] = > 13 pid =  19366
inside [1] = > 12 pid =  19364
inside [1] = > 14 pid =  19365
inside [1] = > 15 pid =  19367
inside [2]= > 14 pid =  19365
inside [2]= > 13 pid =  19366
inside [2]= > 12 pid =  19364
inside [2]= > 15 pid =  19367
inside [1] = > 16 pid =  19365
inside [1] = > 17 pid =  19364
inside [1] = > 18 pid =  19367
inside [1] = > 19 pid =  19366
inside [2]= > 16 pid =  19365
inside [2]= > 18 pid =  19367
inside [2]= > 17 pid =  19364
inside [2]= > 19 pid =  19366
future  0 done [2] => True
future  1 done [2] => True
future  2 done [2] => True
future  3 done [2] => True
future  4 done [2] => True
future  5 done [2] => True
future  6 done [2] => True
future  7 done [2] => True
future  8 done [2] => True
future  9 done [2] => True
future  10 done [2] => True
future  11 done [2] => True
future  12 done [2] => True
future  13 done [2] => True
future  14 done [2] => True
future  15 done [2] => True
future  16 done [2] => True
future  17 done [2] => True
future  18 done [2] => True
future  19 done [2] => True

是的,但据我理解,最初的问题并不在初始化squares上。我猜想,真正的问题在于如何使用PoolExecutor - Ilia w495 Nikitin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接