asyncio中的yield from concurrent.futures.Executor Future

12
我有一个名为long_task的函数,它运行重型的CPU计算。我想通过使用新的asyncio框架将其异步化。生成的long_task_async函数使用ProcessPoolExecutor将工作转移到不受GIL约束的不同进程中。
问题在于,由ProcessPoolExecutor.submit返回的concurrent.futures.Future实例在被yield时会抛出TypeError异常。这是设计上的问题吗?这些future是否与asyncio.Future类不兼容?有什么解决方法?
我还注意到生成器不可序列化,因此将协程提交给ProcessPoolExecutor将失败。是否有任何干净的解决方案?
import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())
2个回答

12
你想要使用loop.run_in_executor,它使用了concurrent.futures执行器,但将返回值映射到一个asyncio future。
原始的asyncio PEP 建议说,concurrent.futures.Future可能会在未来增加__iter__方法,这样它就可以与yield from一起使用,但目前这个库仅需要yield from支持,而不需要其他更多的支持。(否则有些代码在3.3中将无法正常工作。)

12

我们可以通过调用asyncio.wrap_future(Future)concurrent.futures.Future包装成asyncio.future。我尝试了下面的代码,运行良好

from asyncio import coroutine
import asyncio
from concurrent import futures


def do_something():
    ls = []
    for i in range(1, 1000000):
        if i % 133333 == 0:
            ls.append(i)
    return ls


@coroutine
def method():
    with futures.ProcessPoolExecutor(max_workers=10) as executor:
        job = executor.submit(do_something)
        return (yield from asyncio.wrap_future(job))

@coroutine
def call_method():
    result = yield from method()
    print(result)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(call_method())
    finally:
        loop.close()


if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接