具有迭代器的多进程池

6

我希望能够使用多进程池和迭代器一起执行一个函数,将迭代器分成N个元素的线程并行执行直到迭代器完成。

import arcpy
from multiprocessing import Pool

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        #i_cursor is an iterator
        for row in rows:
            i_cursor.insertRow(row)

input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
    #s_cursor is an iterator
    for row in s_cursor:
        if (count < 100):
            input_rows.append(row)
            count += 1
        else:
            #send 100 rows to the insert function in a new thread
            pool.apply_async(insert, input_rows)
            #reset count and input_rows
            count = 1
            input_rows = [row]


pool.join()
pool.close()

我的问题是,这段脚本是否是正确的方法?有更好的方法吗?

可能这个脚本有问题,因为在 pool.join() 处出现了以下断言错误。

Traceback (most recent call last):
  File "G:\Maxime\truncate_append_pool.py", line 50, in <module>
    pool.join()
  File "C:\App\Python27\ArcGIS10.3\lib\multiprocessing\pool.py", line 460, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

你为什么想要一个迭代器呢?迭代器的唯一目的是不必将整个列表加载到内存中,而是在需要时逐个读取(通过next)。但是,你可以使用迭代器来填充一个队列,然后由你的函数消耗它。也许你可以更详细地解释一下你的目标。 - RaJa
不是说我想要一个迭代器,只是事实上游标就是迭代器,所以也许有人可以利用迭代器的功能来回答这个问题。 - Below the Radar
错误提示说了什么? - John Zwinck
@JohnZwinck 请查看修改。 - Below the Radar
1个回答

11
如果我必须猜测您代码的主要问题,我会说在将input_rows传递给进程函数insert()时存在问题 - multiprocessing.Pool.apply_async()的工作方式是解包传递给它的参数,因此您的insert()实际上会检索到100个参数,而不是一个具有100个元素列表的参数。这会导致直接在进程函数开始之前出现错误。如果您将调用更改为pool.apply_async(insert, [input_rows]),它可能会开始工作...您还将击败迭代器的目的,并且您可能会将整个输入迭代器转换为列表并将100的切片提供给multiprocessing.Pool.map()
但是您问是否有一种“更好”的方法来做到这一点。虽然“更好”是一个相对类别,在理想情况下,multiprocessing.Pool带有一个方便的imap()(和imap_unordered())方法,旨在以惰性方式消耗可迭代对象并将它们分配到所选池中(因此在处理之前不会遍历整个迭代器),因此您需要构建的仅是迭代器切片并将其传递给它进行处理,即:
import arcpy
import itertools
import multiprocessing

# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred 
def iterator_slice(iterator, length):
    iterator = iter(iterator)
    while True:
        res = tuple(itertools.islice(iterator, length))
        if not res:
            break
        yield res

def insert(rows):
    with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
        for row in rows:
            i_cursor.insertRow(row)

if __name__ == "__main__":  # guard for multi-platform use
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
            pass  # do whatever you want with your result (return from your process function)
        pool.close()  # all done, close cleanly

(顺便说一下,您的代码对于不是100的倍数的所有大小都不会给出最后一片)

但是......如果它按照广告所说的那样工作,那将是很棒的。虽然这些年来已经修复了很多问题,但是imap_unordered()在生成自己的迭代器时仍会取得大样本您的迭代器(比实际池进程数量要大得多),因此,如果这是一个问题,您必须亲手动手,并且您正在正确的轨道上-apply_async()是控制如何提供池的方式,您只需要确保适当地提供您的池:

if __name__ == "__main__":
    with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
        cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience
        queue = []  # a queue for our current worker async results, a deque would be faster
        while cursor_iterator or queue:  # while we have anything to do...
            try:
                # add our next slice to the pool:
                queue.append(pool.apply_async(insert, [next(cursor_iterator)])) 
            except (StopIteration, TypeError):  # no more data, clear out the slice iterator
                cursor_iterator = None
            # wait for a free worker or until all remaining finish
            while queue and (len(queue) >= pool._processes or not cursor_iterator):
                process = queue.pop(0)  # grab a process response from the top
                process.wait(0.1)  # let it breathe a little, 100ms should be enough
                if not process.ready():  # a sub-process has not finished execution
                    queue.append(process)  # add it back to the queue
                else:
                    # you can use process.get() to get the result if needed
                    pass
        pool.close()

现在,只有当需要下一批100个结果时(即您的insert()处理函数正常退出或异常退出),才会调用您的s_cursor迭代器。

更新 - 先前发布的代码在结束时关闭队列中存在错误,如果需要捕获结果,则此代码应该可以很好地完成工作。我们可以使用一些模拟函数轻松测试它:

import random
import time

# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
    for i in range(limit):
        if not i % 3:  # print every third generation to reduce verbosity
            print("Generated: {}".format(i))
        yield i

# our process function, just prints what's passed to it and waits for 1-6 seconds
def test_process(values):
    time_to_wait = 1 + random.random() * 5
    print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
    time.sleep(time_to_wait)
    print("Processed: {}".format(values))
现在我们可以将它们交织在一起,如下所示:
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers
    count = get_counter(30)  # get our counter iterator set to iterate from 0-29
    count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7
    queue = []  # a queue for our current worker async results, a deque would be faster
    while count_iterator or queue:
        try:
            # add our next slice to the pool:
            queue.append(pool.apply_async(test_process, [next(count_iterator)]))
        except (StopIteration, TypeError):  # no more data, clear out the slice iterator
            count_iterator = None
        # wait for a free worker or until all remaining workers finish
        while queue and (len(queue) >= pool._processes or not count_iterator):
            process = queue.pop(0)  # grab a process response from the top
            process.wait(0.1)  # let it breathe a little, 100ms should be enough
            if not process.ready():  # a sub-process has not finished execution
                queue.append(process)  # add it back to the queue
            else:
                # you can use process.get() to get the result if needed
                pass
    pool.close()

结果是(当然,它会因系统而异):

Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)

验证我们的生成器/迭代器仅在池中有空闲插槽可执行工作时才用于收集数据,确保最小化内存使用(和/或I/O负载,如果您的迭代器最终执行该操作)。您不会得到比这更简洁的东西了。唯一可以获得的额外速度提升是减少等待时间(但主进程将会消耗更多资源),并增加允许的queue大小(以内存为代价),它被锁定为上述代码中的进程数 - 如果您使用while queue and (len(queue) >= pool._processes + 3 or not count_iterator):,它将加载3个以上的迭代器切片,在进程结束并且池中的插槽释放时缩短延迟。


谢谢你的回答,非常有帮助。我正在尝试使用apply_async的“脏”解决方案,在任务管理器中只能看到一个Python实例在进行CPU计算,并且当作业完成时内存使用情况不会被重置。每次线程执行新作业时,内存使用情况都会增加。这是正常的吗? - Below the Radar
@BelowtheRadar - 请查看更新,之前的代码中有一个小错误,尽管即使使用之前的代码,它也不应该按照你描述的方式运行。我还附加了一个测试用例来证明它可以正常工作。现在,如果在arcpy中存在一些多进程同步或数据缓存问题-那就是arcpy的问题,多进程部分应该按预期工作。 - zwer
干得好! 非常有启发性的答案! - Below the Radar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接