如果我必须猜测您代码的主要问题,我会说在将
input_rows
传递给进程函数
insert()
时存在问题 -
multiprocessing.Pool.apply_async()
的工作方式是解包传递给它的参数,因此您的
insert()
实际上会检索到100个参数,而不是一个具有100个元素列表的参数。这会导致直接在进程函数开始之前出现错误。如果您将调用更改为
pool.apply_async(insert, [input_rows])
,它可能会开始工作...您还将击败迭代器的目的,并且您可能会将整个输入迭代器转换为列表并将100的切片提供给
multiprocessing.Pool.map()
。
但是您问是否有一种“更好”的方法来做到这一点。虽然“更好”是一个相对类别,在理想情况下,
multiprocessing.Pool
带有一个方便的
imap()
(和
imap_unordered()
)方法,旨在以惰性方式消耗可迭代对象并将它们分配到所选池中(因此在处理之前不会遍历整个迭代器),因此您需要构建的仅是迭代器切片并将其传递给它进行处理,即:
import arcpy
import itertools
import multiprocessing
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def insert(rows):
with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
for row in rows:
i_cursor.insertRow(row)
if __name__ == "__main__":
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)
for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
pass
pool.close()
(顺便说一下,您的代码对于不是100的倍数的所有大小都不会给出最后一片)
但是......如果它按照广告所说的那样工作,那将是很棒的。虽然这些年来已经修复了很多问题,但是imap_unordered()
在生成自己的迭代器时仍会取得大样本您的迭代器(比实际池进程数量要大得多),因此,如果这是一个问题,您必须亲手动手,并且您正在正确的轨道上-apply_async()
是控制如何提供池的方式,您只需要确保适当地提供您的池:
if __name__ == "__main__":
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)
cursor_iterator = iterator_slice(s_cursor, 100)
queue = []
while cursor_iterator or queue:
try:
queue.append(pool.apply_async(insert, [next(cursor_iterator)]))
except (StopIteration, TypeError):
cursor_iterator = None
while queue and (len(queue) >= pool._processes or not cursor_iterator):
process = queue.pop(0)
process.wait(0.1)
if not process.ready():
queue.append(process)
else:
pass
pool.close()
现在,只有当需要下一批100个结果时(即您的insert()
处理函数正常退出或异常退出),才会调用您的s_cursor
迭代器。
更新 - 先前发布的代码在结束时关闭队列中存在错误,如果需要捕获结果,则此代码应该可以很好地完成工作。我们可以使用一些模拟函数轻松测试它:
import random
import time
def get_counter(limit=100):
for i in range(limit):
if not i % 3:
print("Generated: {}".format(i))
yield i
def test_process(values):
time_to_wait = 1 + random.random() * 5
print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
time.sleep(time_to_wait)
print("Processed: {}".format(values))
现在我们可以将它们交织在一起,如下所示:
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2)
count = get_counter(30)
count_iterator = iterator_slice(count, 7)
queue = []
while count_iterator or queue:
try:
queue.append(pool.apply_async(test_process, [next(count_iterator)]))
except (StopIteration, TypeError):
count_iterator = None
while queue and (len(queue) >= pool._processes or not count_iterator):
process = queue.pop(0)
process.wait(0.1)
if not process.ready():
queue.append(process)
else:
pass
pool.close()
结果是(当然,它会因系统而异):
Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)
验证我们的生成器/迭代器仅在池中有空闲插槽可执行工作时才用于收集数据,确保最小化内存使用(和/或I/O负载,如果您的迭代器最终执行该操作)。您不会得到比这更简洁的东西了。唯一可以获得的额外速度提升是减少等待时间(但主进程将会消耗更多资源),并增加允许的queue
大小(以内存为代价),它被锁定为上述代码中的进程数 - 如果您使用while queue and (len(queue) >= pool._processes + 3 or not count_iterator):
,它将加载3个以上的迭代器切片,在进程结束并且池中的插槽释放时缩短延迟。