如何使用asyncio并行计算?

3
我有一段需要执行时间很长且需要大量CPU资源的代码块。我想要运行这个代码块多次,并充分利用我的CPU资源。通过查看asyncio,我了解到它主要用于异步通讯,但也是用于异步任务的通用工具。
下面的示例中,time.sleep(y)是我想要运行的代码的占位符。在此示例中,每个协程依次执行,整个执行过程大约需要8秒钟。
import asyncio
import logging
import time


async def _do_compute_intense_stuff(x, y, logger):
    logger.info('Getting it started...')
    for i in range(x):
        time.sleep(y)
    logger.info('Almost done')
    return x * y

logging.basicConfig(format='[%(name)s, %(levelname)s]: %(message)s', level='INFO')
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
co_routines = [
    asyncio.ensure_future(_do_compute_intense_stuff(2, 1, logger.getChild(str(i)))) for i in range(4)]
logger.info('Made the co-routines')
responses = loop.run_until_complete(asyncio.gather(*co_routines))
logger.info('Loop is done')
print(responses)

当我将time.sleep(y)替换为asyncio.sleep(y)时,它几乎立即返回结果。而使用await asyncio.sleep(y)则需要大约2秒钟。
是否有一种方法可以使用这种方法并行执行我的代码,或者应该使用multiprocessingthreading?我需要将time.sleep(y)放入线程中吗?

3
不需要使用asyncio。当你遇到“等待I/O操作完成”的问题时,asyncio非常有效。而高强度计算并不是这种问题。应该使用多进程。只有在使用一些C扩展库进行重型计算时,才应该使用线程,并且该库会在执行重型计算时释放GIL。 - Martijn Pieters
2
Asyncio还要求您的所有代码协作。每个await是一个地方,让您的任务告诉事件循环,如果其他任务没有在等待,则它愿意运行。 time.sleep()则正好相反。它会阻止一切,因此事件循环无法切换任务。 - Martijn Pieters
2
asyncio.sleep() 会生成一个协程。如果你不在它上面使用 await,它将不会执行任何操作,因此是的,你会立即看到返回结果。 - Martijn Pieters
谢谢@MartijnPieters,这解决了一些困惑! - Benjamin
2个回答

5
执行者使用多线程来完成这个任务(或者如果您喜欢,可以使用多进程)。 Asyncio用于优化代码,其中您经常等待输入输出操作运行。有时可以写入文件或加载网站。
然而,在需要进行大量计算的操作中(不仅仅依赖于等待IO),建议使用类似于线程的东西,并且在我看来,concurrent.futures为此提供了非常好的包装器,它类似于Asyncio的包装器。
Asyncio.sleep之所以能使您的代码运行更快,是因为它启动函数,然后开始检查协程是否准备就绪。这对于需要进行大量计算的操作来说并不是很好,因为没有IO需要等待。
要将以下示例从多进程更改为多线程,只需将ProcessPoolExecutor更改为ThreadPoolExecutor
这是一个多进程示例:
import concurrent.futures
import time

def a(z):
    time.sleep(1)
    return z*12

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(a, i) for i in range(5)}
        for future in concurrent.futures.as_completed(futures):
            data = future.result()
            print(data)

这是一个简化版的示例,源自于执行器文档中提供的示例

到目前为止,感谢您的帮助。它似乎在某种程度上起作用了。函数是并发执行的,但不是并行执行的(我猜)。当我执行一次我的函数时,它会占用整个核心并花费20秒钟。当我执行2个函数时,需要1分钟,并且没有任何一个核心达到100%。有什么想法吗? - Benjamin
处理器不会达到100%,因为您只有一个核心。如果您希望完全加载CPU,请使用多进程。请记住,在启动进程之前,您可能需要预先准备数据,因为当第二个进程运行时,队列之间的通信有时可能会很棘手。 - user6767685
@Benjamin 更新了答案,使用了多进程技术。 - Neil
1
来自Java,其中每个线程在一个核心上运行,这个[https://dev59.com/TXA75IYBdhLWcg3w6Ndj#3046201](问题)帮助我理解了Python中进程和线程的区别。然而,我的实际代码不想在进程中运行,因为我在那里使用了无法被pickled的类。由于我没有进程间通信,所以我放弃了好的库,只是用一个包装器用`Popen`启动我的脚本,并在`ThreadPoolExecutor`中从stdout读回。 - Benjamin
既然你的回答解决了问题,我会接受它。 - Benjamin

0

简单示例

这个示例来自于https://www.blog.pythonlibrary.org/2016/07/26/python-3-an-intro-to-asyncio/

它对我很有帮助。还有一个“坏的示例”——这让我受益更多^^

import aiohttp
import asyncio
import async_timeout
import os

async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()

async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接