aiohttp.TCPConnector(带有limit参数)与asyncio.Semaphore在限制并发连接数量方面的区别

22

我想学习新的Python异步等待语法,更具体地通过编写一个简单的脚本来实现同时下载多个资源。

但是现在我卡住了。

在研究过程中,我找到了两个限制并发请求数量的选项:

  1. 将aiohttp.TCPConnector(带有限制参数)传递给aiohttp.ClientSession;或
  2. 使用asyncio.Semaphore。

是否有首选选项?如果你只是想限制并发连接数量,它们是否可以互换使用?它们的性能是否相同(大致相等)?

另外,它们似乎都具有100个并发连接/操作的默认值。如果我仅使用一个限制为500的Semaphore,aiohttp内部是否会自动将我限制在100个并发连接上?

这对我来说都很新和不清楚。请随意指出我的误解或代码中的缺陷。

这是我当前包含两个选项的代码(我应该删除哪个?):

奖励问题:

  1. 如何处理(最好重试x次)抛出错误的coros?
  2. 保存返回的数据的最佳方法是什么(通知我的DataHandler),以便在coro完成时尽快保存数据?我不希望所有结果都在最后保存,因为我可以尽快开始使用结果。

s

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = None
        self.receiving_bar = None

        asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
        self.loop = asyncio.get_event_loop()
        self.semaphore = asyncio.Semaphore(concurrent_connections)

    async def fetch(self, session, url):
        # This is option 1: The semaphore, limiting the number of concurrent coros,
        # thereby limiting the number of concurrent requests.
        with (await self.semaphore):
            async with session.get(url) as response:
                # Bonus Question 1: What is the best way to retry a request that failed?
                resp_task = asyncio.ensure_future(response.read())
                self.sending_bar.update(1)
                resp = await resp_task

                await  response.release()
                if not self.silent:
                    self.receiving_bar.update(1)
                return resp

    async def batch_download(self, urls, auth=None):
        # This is option 2: Limiting the number of open connections directly via the TCPConnector
        conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
        async with ClientSession(connector=conn, auth=auth) as session:
            await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])

    async def download_and_save(self, session, url):
        content_task = asyncio.ensure_future(self.fetch(session, url))
        content = await content_task
        # Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
        # or should I use something like asyncio.as_completed in the download function?
        self.data_handler.take(origin_url=url, data=content)

    def download(self, urls, auth=None):
        if isinstance(auth, tuple):
            auth = BasicAuth(*auth)
        print('Running on concurrency level {}'.format(self.concurrent_connections))
        self.sending_bar = tqdm(urls, total=len(urls), desc='Sent    ', unit='requests')
        self.sending_bar.update(0)

        self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
        self.receiving_bar.update(0)

        tasks = self.batch_download(urls, auth)
        self.loop.run_until_complete(tasks)
        return self.data_handler.done()


### call like so ###

URL_PATTERN = 'https://www.example.com/{}.html'

def gen_url(lower=0, upper=None):
    for i in range(lower, upper):
        yield URL_PATTERN.format(i)   

ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])

我有同样的问题,看起来它们可以互换使用。https://dev59.com/kFsW5IYBdhLWcg3wQVX2 - Glen Thompson
asyncio.Semaphore 类的内部计数器只有默认值 1。在此处查看 asyncio 同步原语。 它可以根据需要增加到更高的值,但是您的操作系统仍然会有一个限制,即同时打开文件的数量(TCP 连接在 *nix 系统中是文件,包括 macOS)。 - Darkfish
对于附加问题2,请查看软件架构中的生产者-消费者设计模式。 - Darkfish
通常我更喜欢看到最少量的代码来描述问题,但我刚刚在这里发现了tqdm。不再需要手动编写ASCII旋转器,谢谢! - Joseph Sheedy
2个回答

3

有首选项吗?

是的,请参见下文:

aiohttp内部会将我隐式锁定在100个并发连接上吗?

是的,默认值为100,除非您指定另一个限制。您可以在此处查看源代码:https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084

它们(大致)在性能方面相等吗?

不是(但性能差异应该可以忽略不计),因为 aiohttp.TCPConnector 无论是否被 Semaphore 包围都会检查可用连接,使用 Semaphore 只会增加不必要的开销。

如何处理(最好重试 x 次)抛出错误的协程?

我认为没有标准的处理方式,但一种解决方法是将调用包装在像这样的方法中:

async def retry_requests(...):
    for i in range(5):
        try:
            return (await session.get(...)
        except aiohttp.ClientResponseError:
            pass

0

如何处理(最好重试x次)抛出错误的协程?

我创建了一个Python装饰器来处理它。

    def retry(cls, exceptions, tries=3, delay=2, backoff=2):
        """
        Retry calling the decorated function using an exponential backoff. This
        is required in case of requesting Braze API produces any exceptions.

        Args:
            exceptions: The exception to check. may be a tuple of
                exceptions to check.
            tries: Number of times to try (not retry) before giving up.
            delay: Initial delay between retries in seconds.
            backoff: Backoff multiplier (e.g. value of 2 will double the delay
                each retry).
        """

        def deco_retry(func):
            @wraps(func)
            def f_retry(*args, **kwargs):
                mtries, mdelay = tries, delay
                while mtries > 1:
                    try:
                        return func(*args, **kwargs)
                    except exceptions as e:
                        msg = '{}, Retrying in {} seconds...'.format(e, mdelay)
                        if logging:
                            logging.warning(msg)
                        else:
                            print(msg)
                        time.sleep(mdelay)
                        mtries -= 1
                        mdelay *= backoff
                return func(*args, **kwargs)

            return f_retry

        return deco_retry

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接