使用多进程池的Boto3客户端出现“botocore.exceptions.NoCredentialsError: Unable to locate credentials”错误。

5

我正在使用boto3连接到S3,下载对象并进行一些处理。 我正在使用多进程池来完成以上操作。

以下是我正在使用的代码概要:

session = None

def set_global_session():
    global session
    if not session:
        session = boto3.Session(region_name='us-east-1')

def function_to_be_sent_to_mp_pool():
    s3 = session.client('s3', region_name='us-east-1')
    list_of_b_n_o = list_of_buckets_and_objects
    for bucket, object in list_of_b_n_o:
        content = s3.get_object(Bucket=bucket, Key=key)
        data = json.loads(content['Body'].read().decode('utf-8'))
        write_processed_data_to_a_location()

def main():
    pool = mp.Pool(initializer=set_global_session, processes=40)
    pool.starmap(function_to_be_sent_to_mp_pool, list_of_b_n_o_i)

现在,当 processes=40 时,一切都运行良好。当 processes=64 时,也很好。

然而,当我增加到 processes=128 时,出现以下错误:

botocore.exceptions.NoCredentialsError: Unable to locate credentials

我们的机器拥有访问S3所需的IAM角色。此外,奇怪的事情是,对于某些进程,它可以正常工作,而对于另一些进程,则会抛出凭据错误。这是为什么?如何解决?
另一个奇怪的事情是,我能够在两个单独的终端标签中触发两个作业(每个标签均有单独的ssh登录shell到该机器)。每个作业生成64个进程,这也正常工作,这意味着同时运行128个进程。但同一个登录shell中的80个进程会失败。
后续:
我尝试在一个方法中为不同的进程创建单独的会话。在另一种方法中,我直接使用创建s3客户端。但是,它们都在80个进程中抛出相同的错误。
我还使用以下额外配置创建了单独的客户端:
Config(retries=dict(max_attempts=40), max_pool_connections=800)

这使我能够同时使用80个进程,但超过80个就会出现相同的错误。

后续帖子:

有人能确认他们是否已经成功地在128个进程中使用了boto3吗?


2
我遇到了完全相同的问题,使用64个进程失败。 - PaF
3个回答

4

我怀疑AWS最近降低了元数据请求的限制,因为我突然遇到了相同的问题。似乎有效的解决方案是在创建池之前查询一次凭据,并让池中的进程显式地使用它们,而不是再次查询凭据。

我正在使用fsspec和s3fs,以下是我的代码:

def get_aws_credentials():
    '''
    Retrieve current AWS credentials.
    '''
    import asyncio, s3fs
    fs = s3fs.S3FileSystem()

    # Try getting credentials
    num_attempts = 5
    for attempt in range(num_attempts):
        credentials = asyncio.run(fs.session.get_credentials())
        if credentials is not None:
            if attempt > 0:
                log.info('received credentials on attempt %s', 1 + attempt)
            return asyncio.run(credentials.get_frozen_credentials())

        time.sleep(15 * (random.random() + 0.5))

    raise RuntimeError('failed to request AWS credentials '
                       'after %d attempts' % num_attempts)


def process_parallel(fn_d, max_processes):
    # [...]
    c = get_aws_credentials()

    # Cache credentials
    import fsspec.config
    prev_s3_cfg = fsspec.config.conf.get('s3', {})
    try:
        fsspec.config.conf['s3'] = dict(prev_s3_cfg,
                                        key=c.access_key,
                                        secret=c.secret_key)

        num_processes = min(len(fn_d), max_processes)

        from concurrent.futures import ProcessPoolExecutor
        with ProcessPoolExecutor(max_workers=num_processes) as pool:
            for data in pool.map(process_file, fn_d, chunksize=10):
                yield data
    finally:
        fsspec.config.conf['s3'] = prev_s3_cfg

原始的boto3代码看起来基本相同,只是不需要使用整个fs.session和asyncio.run()的代码段,你将使用boto3.Session本身并直接调用其get_credentials()和get_frozen_credentials()方法。


4

实际上这是一个获取凭证的竞态条件。我不确定在底层如何获取凭证,但我看到了Stack Overflow上的这个问题和GitHub上的这个票据。

我通过为每个进程保留一个随机等待时间来解决了这个问题。以下是我的更新后的代码,可以正常工作:

client_config = Config(retries=dict(max_attempts=400), max_pool_connections=800)
time.sleep(random.randint(0, num_processes*10)/1000) # random sleep time in milliseconds
s3 = boto3.client('s3', region_name='us-east-1', config=client_config)

我尝试将睡眠时间范围保持在num_processes*10以下,但由于同样的问题再次失败。

@DenisDmitriev,因为您正在获取凭据并明确存储它们,我认为这解决了竞态条件,因此问题得到了解决。

PS:max_attemptsmax_pool_connections的值没有逻辑。我插入了几个值,直到发现竞争条件。


谢谢您的回答,这个解决方案对我有用。但是,我想了解max_attemptsmax_pool_connections的目的是什么? max_attempts是否意味着在关闭boto3客户端之前重试400次(在本例中)?另外,由于不同的线程使用不同的客户端,max_connection_pool在这里如何帮助? - nishant

1

我在多进程环境下遇到了同样的问题。我猜测当你使用多进程时,可能存在客户端初始化问题。因此,我建议你可以使用get函数来获取s3客户端。对我来说这很有效。

g_s3_cli = None

def get_s3_client(refresh=False):
    global g_s3_cli
    if not g_s3_cli or refresh:
        g_s3_cli = boto3.client('s3')
    return g_s3_cli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接