Python异步IO没有显示任何错误。

5

我正在尝试使用asyncio从成千上万个URL中获取一些数据。

以下是设计的简要概述:

  1. 用单个Producer一次性填充一个Queue,其中包含一堆URL
  2. 生成一堆Consumers
  3. 每个Consumer保持异步地从Queue中提取URL并发送GET请求
  4. 对结果进行后处理
  5. 组合所有处理后的结果并返回

问题:asyncio几乎从不显示任何错误,它只是在没有错误的情况下静默挂起。我到处放置了print语句以自己检测问题,但效果不大。

根据输入URL数量和消费者数量或限制,可能会出现以下错误:

  1. Task was destroyed but it is pending!
  2. task exception was never retrieved future: <Task finished coro=<consumer()
  3. aiohttp.client_exceptions.ServerDisconnectedError
  4. aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine

问题:如何检测和处理asyncio中的异常?如何在不破坏Queue的情况下进行重试?

以下是我编写的代码,我在查看各种异步代码示例时编译了它。目前,在def get_video_title函数末尾有一个故意引发的错误。运行时什么也没有显示。

import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this


user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"

def get_video_title(data):
    match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
    string = match[1].strip()[:-1]
    result = json.loads(string)
    return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'

async def fetch(session, url, c):
    async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
        print('---------Fetching', c)
        if r.status != 200:
            r.raise_for_status()
        return await r.text()

async def consumer(queue, session, responses):
    while True:
        try:
            i, url = await queue.get()
            print("Fetching from a queue", i)
            html_page = await fetch(session, url, i)

            print('+++Processing', i)
            result = get_video_title(html_page) # should raise an error here!
            responses.append(result)
            queue.task_done()

            print('+++Task Done', i)

        except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
            print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
            await asyncio.sleep(1)
            queue.task_done()

async def produce(queue, urls):
    for i, url in enumerate(urls):
        print('Putting in a queue', i)
        await queue.put((i, url))

async def run(session, urls, consumer_num):
    queue, responses = asyncio.Queue(maxsize=2000), []

    print('[Making Consumers]')
    consumers = [asyncio.ensure_future(
        consumer(queue, session, responses)) 
                 for _ in range(consumer_num)]

    print('[Making Producer]')
    producer = await produce(queue=queue, urls=urls)

    print('[Joining queue]')
    await queue.join()

    print('[Cancelling]')
    for consumer_future in consumers:
        consumer_future.cancel()

    print('[Returning results]')
    return responses

async def main(loop, urls):
    print('Starting a Session')
    async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
        print('Calling main function')
        posts = await run(session, urls, 100)
        print('Done')
        return posts


if __name__ == '__main__':
    urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, urls))
1个回答

11
问题在于您的consumer仅捕获两个非常特定的异常,并在这些情况下将任务标记为已完成。如果发生任何其他异常,例如与网络相关的异常,它将终止消费者。然而,这并没有被run检测到,后者正在等待使用消费者的queue.join()(实际上)在后台运行。这就是为什么您的程序会挂起 - 排队的项目永远不会被计算,队列也永远不会完全处理。
有两种方法可以解决此问题,具体取决于在遇到意外异常时您想要程序做什么。如果您希望它保持运行状态,则可以向消费者添加一个万能的except子句,例如:
        except Exception as e
            print('other error', e)
            queue.task_done()

另外一种选择是让未处理的消费者异常传播到run。虽然这必须要明确地安排,但它有一个好处:不允许异常悄悄地传递。 (有关该主题的详细处理,请参见此文章。)实现它的一种方法是同时等待 queue.join() 和消费者;由于消费者处于无限循环中,只有在异常情况下才能完成。

    print('[Joining queue]')
    # wait for either `queue.join()` to complete or a consumer to raise
    done, _ = await asyncio.wait([queue.join(), *consumers],
                                 return_when=asyncio.FIRST_COMPLETED)
    consumers_raised = set(done) & set(consumers)
    if consumers_raised:
        await consumers_raised.pop()  # propagate the exception

问题:如何在asyncio中检测和处理异常?

异常会通过await传播,并且通常像在其他代码中一样被检测和处理。特殊处理只需要捕获从“后台”任务(如consumer)泄漏的异常。

如何在不破坏队列的情况下重试?

您可以在except块中调用await queue.put((i, url))。该项将添加到队列的末尾,以由消费者接收。在这种情况下,您只需要第一个代码片段,而不需要尝试将consumer中的异常传播到run中。


感谢您提供如此清晰详细的答案。在阅读您的回答之前,我已经采取了以下措施来解决我提到的问题: 1)对于重试,我在while循环内添加了一个for循环。 2)我还在get_video_title函数中添加了KeyError异常。 3)我降低了最大消费者数量并限制了最大TCP连接数量。 4)我在GET请求中将raise_for_status设置为False。总的来说,这解决了所有问题,我能够在一分钟多一点的时间内处理1000个URL。任务管理器显示瓶颈是我的Wi-Fi速度。 - Superbman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接