如何在asyncio中并发运行任务?

10

我正在尝试学习如何使用Python的asyncio模块并发运行任务。在以下代码中,我有一个模拟的“网络爬虫”作为示例。基本上,我想让在任何给定时间最多只有两个活动的fetch()请求,并且我希望在sleep()期间调用process()。

import asyncio

class Crawler():

    urlq = ['http://www.google.com', 'http://www.yahoo.com', 
            'http://www.cnn.com', 'http://www.gamespot.com', 
            'http://www.facebook.com', 'http://www.evergreen.edu']

    htmlq = []
    MAX_ACTIVE_FETCHES = 2
    active_fetches = 0

    def __init__(self):
        pass

    async def fetch(self, url):
        self.active_fetches += 1
        print("Fetching URL: " + url);
        await(asyncio.sleep(2))
        self.active_fetches -= 1
        self.htmlq.append(url)

    async def crawl(self):
        while self.active_fetches < self.MAX_ACTIVE_FETCHES:
            if self.urlq:
                url = self.urlq.pop()
                task = asyncio.create_task(self.fetch(url))
                await task
            else:
                print("URL queue empty")
                break;

    def process(self, page):
        print("processed page: " + page)

# main loop

c = Crawler()
while(c.urlq):
    asyncio.run(c.crawl())
    while c.htmlq:
        page = c.htmlq.pop()
        c.process(page)

然而,上述代码是逐个下载URL(不是并发下载两个URL),并且在获取所有URL之后才进行任何“处理”。我该如何使fetch()任务并发运行,并在sleep()期间调用process()函数?


我的终极目标是编写一个异步网络爬虫,它将在后台不断从URL队列中获取页面,并与抓取同时进行HTML/文本处理。这只是模拟代码,以学习使用asyncio...这就是为什么我正在尝试在抓取过程中并行运行处理,而不仅仅是在之后执行。 - J. Taylor
1
运行事件循环是一项阻塞操作。您需要将process变成一个协程,以便与循环一起调度或由其他等待它的东西来执行。 - dirn
2个回答

15

您的 crawl 方法在每个单独的任务后等待;建议修改为以下方式:

async def crawl(self):
    tasks = []
    while self.active_fetches < self.MAX_ACTIVE_FETCHES:
        if self.urlq:
            url = self.urlq.pop()
            tasks.append(asyncio.create_task(self.fetch(url)))
    await asyncio.gather(*tasks)
< p >编辑:这里有一个更干净的版本,带有注释,可以同时获取和处理所有内容,同时保留了限制最大获取器数量的基本能力。

import asyncio

class Crawler:

    def __init__(self, urls, max_workers=2):
        self.urls = urls
        # create a queue that only allows a maximum of two items
        self.fetching = asyncio.Queue()
        self.max_workers = max_workers

    async def crawl(self):
        # DON'T await here; start consuming things out of the queue, and
        # meanwhile execution of this function continues. We'll start two
        # coroutines for fetching and two coroutines for processing.
        all_the_coros = asyncio.gather(
            *[self._worker(i) for i in range(self.max_workers)])

        # place all URLs on the queue
        for url in self.urls:
            await self.fetching.put(url)

        # now put a bunch of `None`'s in the queue as signals to the workers
        # that there are no more items in the queue.
        for _ in range(self.max_workers):
            await self.fetching.put(None)

        # now make sure everything is done
        await all_the_coros

    async def _worker(self, i):
        while True:
            url = await self.fetching.get()
            if url is None:
                # this coroutine is done; simply return to exit
                return

            print(f'Fetch worker {i} is fetching a URL: {url}')
            page = await self.fetch(url)
            self.process(page)

    async def fetch(self, url):
        print("Fetching URL: " + url);
        await asyncio.sleep(2)
        return f"the contents of {url}"

    def process(self, page):
        print("processed page: " + page)


# main loop
c = Crawler(['http://www.google.com', 'http://www.yahoo.com', 
             'http://www.cnn.com', 'http://www.gamespot.com', 
             'http://www.facebook.com', 'http://www.evergreen.edu'])
asyncio.run(c.crawl())

谢谢dtanabe。您的解决方案确实使fetch()调用并发运行,但它仍然推迟处理,直到所有调用都被获取。我该如何做才能在下载正在进行时,在htmlq中放置的任何HTML上调用process()? - J. Taylor
1
添加了更完整的版本,可以在允许其他URL获取的同时处理获取到的数据。 - dtanabe

1
您可以将htmlq变为asyncio.Queue(),并将htmlq.append改为htmlq.push。然后,您的main函数可以是异步的,就像这样:
async def main():
    c = Crawler()
    asyncio.create_task(c.crawl())
    while True:
        page = await c.htmlq.get()
        if page is None:
            break
        c.process(page)

您的顶级代码归结为对 asyncio.run(main()) 的调用。

一旦完成爬取,crawl() 可以将 None 入队,以通知主协程工作已完成。


3
这样做的额外好处是,通过将队列大小设置为2,无需跟踪并发获取的数量。 - dirn
谢谢@user4815162342。在你的提示下,我知道了asyncio.Queue()这个方法,这对我很有帮助。但是当我试图按照您的建议修改代码时,出现了以下错误:"* RuntimeError:Task <Task pending coro = <Crawler.crawl()running at testasync.py:39> cb = [_run_until_complete_cb()at /usr/lib/python3.7/asyncio/base_events.py:158]> got Future <Future pending> attached to a different loop*" ... 我不确定是否需要以某种方式修改crawl(),使其与您的代码一起工作,或者发生了什么事情,但似乎在“page = await self.htmlq.get()”这一行上出现了故障...有什么想法吗? - J. Taylor
2
@J.Taylor 问题在于 Queue() 需要在相同的事件循环中实例化,而每个 asyncio.run() 都会创建一个 新的 事件循环。在您的情况下,解决方法是将 Crawler 实例化移动到协程中;我已经相应地编辑了答案。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接