在多进程池中保持boto3会话活动

5

我想要删除许多s3中的文件。 我计划使用一个multiprocessing.Pool来执行所有这些删除操作,但我不确定如何在作业之间保持s3.client的活动状态。 我希望做到这样的事情:

import boto3
import multiprocessing as mp

def work(key):
    s3_client = boto3.client('s3')
    s3_client.delete_object(Bucket='bucket', Key=key)

with mp.Pool() as pool:
    pool.map(work, lazy_iterator_of_billion_keys)

但这样做的问题是,在每个作业的开始都要花费大量的时间来执行s3_client = boto3.client('s3')文档建议为每个进程创建一个新的资源实例,因此我需要一种方法为每个进程创建一个s3客户端。
是否有任何方法可以为池中的每个进程创建持久的s3客户端或缓存客户端?
此外,我计划通过发送密钥批次并使用s3_client.delete_objects来优化删除操作,但为了简单起见,在我的示例中使用了s3_client.delete_object

以下问题及其答案是否有所帮助?https://dev59.com/fFUK5IYBdhLWcg3wwiSW - Emre Sevinç
不,那个问题更多地涉及将对象传递到多进程池中。 - TheStrangeQuark
2个回答

4

请检查RealPython并发教程中的这个片段。由于每个进程都有自己的内存空间,因此您无法共享资源,因此他们为每个进程创建了一个单独的请求会话。相反,他们创建了一个全局会话对象来初始化多处理池,否则,每次调用函数时都会实例化一个会话对象,这是一项昂贵的操作。

因此,按照这种逻辑,您可以以这种方式实例化boto3客户端,并且您只会为每个进程创建一个客户端。

import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

我喜欢这个想法,但尽可能避免使用global和顶层变量。我在我的解决方案中对此方法进行了基准测试,证明它仍然很快。 - TheStrangeQuark
不太确定这里发生了什么,或者它如何帮助。为什么这与在函数外创建全局会话对象不同?在运行时什么情况下会将全局会话变量设置为None? - bgenchel

2
我最终使用了functools.lru_cache和一个获取s3客户端的辅助函数来解决这个问题。 LRU缓存将在进程中保持一致,因此它将保留连接。 辅助函数如下:
from functools import lru_cache

@lru_cache()
def s3_client():
    return boto3.client('s3')

然后在我的work函数中调用它,如下:

def work(key):
    s3_client = s3_client()
    s3_client.delete_object(Bucket='bucket', Key=key)

我能够以以下方式测试和基准测试它:
import os
from time import time

def benchmark(key):
    t1 = time()
    s3 = get_s3()
    print(f'[{os.getpid()}] [{s3.head_object(Bucket='bucket', Key=key)}] :: Total time: {time() - t1} s')

with mp.Pool() as p:
    p.map(benchmark, big_list_of_keys)

这个结果表明,对于每个pid的第一个函数调用大约需要0.5秒,然后对于同一个pid的后续调用大约需要2e-6秒。这足以证明客户端连接被缓存并按照我所期望的方式工作。
有趣的是,如果我没有在s3_client()上使用@lru_cache(),那么后续的调用将需要大约0.005秒,所以boto3内部肯定有一些自动发生的缓存,而我并不知道。
为了测试目的,我以以下方式对Milton's answer进行了基准测试。
s3 = None

def set_global_session():
    global s3
    if not s3:
        s3 = boto3.client('s3')

with mp.Pool(initializer=set_global_session) as p:
    p.map(benchmark, big_list_of_keys)

这也平均每个任务耗时3e-6秒,因此与在辅助函数上使用functools.lru_cache几乎相同。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接