异步、Lambda、Boto3、ThreadPoolExecutor:连接池已满,正在丢弃连接。

6

一些背景信息:

我正在使用CodePipeline和两个lambda函数开发用于更新和测试全局重定向的测试套件:包装器函数和测试器函数。

包装器循环遍历域名列表。对于每个域名,它获取关联的S3文件,然后创建一个长列表对象(文件中每行一个对象)。例如,一个对象示例:

redirect = {
     'staging': None,
     'prod': None,
     'country': 'US',
     'language': 'EN',
     'status_code': None,
     'shortlink': None,
     'expected': None
}

这个包装器接受对象列表并调用第二个 lambda 函数(测试 lambda)来发送每个对象的一系列 httpx 请求等等。

最重要的是异步处理 - 否则,lambda 将在 15 分钟时限内超时。

实际问题:

这个包装器函数只能触发测试程序10次。否则,当结果返回时,会出现以下错误:

Connection pool is full, discarding connection: lambda.us-east-1.amazonaws.com

我已经将ThreadPoolExecutor的max_workers增加,并为boto3设置了max_workers。 我与AWS支持人员交谈过,我的实际Lambda设置有利于触发我的测试Lambda的多次调用。 然而,我仍然会遇到连接池错误。

相关代码:

from botocore.client import Config
from boto3.session import Session
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3

max_pool_connections = 30

config = Config(
    max_pool_connections=max_pool_connections,
    read_timeout = 900
)

def handler(event, context):
    try:
        events = ... # generated via a bunch of nonsense 

        with ThreadPoolExecutor(max_pool_connections) as executor:
            futures = []
            for event in events:
                print(event)
                future = executor.submit(lambda_client.invoke, FunctionName = "site-tester", InvocationType = "RequestResponse", Payload = json.dumps(event))
                futures.append(future)

            for index, future in enumerate(as_completed(futures, timeout=None), start=1):
                ...

这显然不是完整的代码,因为完整的代码是一团混乱。我暂时没有时间编写完整的测试功能,但如果有人有任何初始想法或故障排除提示,将不胜感激。

我想要指出的是,对于那10个被调用并返回的事件,一切都按预期工作。但它只限于10次,而我绝对需要更多次数。

再次强调最重要的是它是异步的-否则,Lambda函数将在15分钟时超时。

并发设置

包装器函数:保留并发设置为30

测试函数:保留并发设置为30


1
我强烈建议您查看AWS Step Functions。它们更适合您在这里所做的事情。 - Jens
谢谢!我一定会看一下。我认为我最担心的是步骤函数的成本,这取决于如何重新设计这个工作流程。 - Falpangaea
1
始终牢记成本是个好习惯。根据我的经验,与其他通常使用的资源相比,Lambda 的成本足够低廉,不会在大多数情况下造成影响。如果成本真的是一个问题,您可以考虑批量运行 Lambda,而不是每个 URL 运行一个 Lambda 等等。此外,请查看运行速度快且不占用大量内存的运行时。据我所知,Go 是您最好的选择。Dotnetcore 也很快,但使用的内存要多得多。(请记住,您按持续时间 * 预配内存付费)。 - Jens
2个回答

9

正如 JustinTArthur 在 Github 上所述:

这个警告是正常的。如果你深入挖掘 urllib3 连接池代码,它基本上是一组持久连接的池,而不是您可以拥有的最大并发连接数。如果连接没有被端点关闭,则会重用池中的连接。

如果您想增加此池的大小,可以使用低级别的 botocore 配置为每个端点完成。 boto3 示例:

import boto3
import botocore

client_config = botocore.config.Config(
    max_pool_connections=25,
)
boto3.client('lambda', config=client_config)

或者
s3_client = boto3.client('s3', config=botocore.client.Config(max_pool_connections=50))

4
所以警告本身来自urllib3库,boto3使用该库进行HTTP请求。

max_pool_connections配置选项为ConnectionPool类设置了maxsize

您可以尝试进一步增加此选项以查看是否有帮助,但不要添加更多的线程工作者。

目前不清楚您如何从代码片段中创建客户端,但似乎资源不是线程安全的,应为每个线程/进程创建单独的资源:

多线程和多进程:

注意

资源不是线程安全的。这些特殊类包含了不能在多个线程之间共享的附加元数据。使用资源时,建议为每个线程实例化一个新的资源,如上面的示例所示。

低级客户端线程安全的。使用低级客户端时,建议实例化您的客户端,然后将该客户端对象传递给您的每个线程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接