用Python并行下载多个S3对象

15

有没有一种方法可以使用Python3中的boto3库同时下载S3文件? 我知道aiobotocore库,但我想知道是否有一种使用标准boto3库的方法。


通过查看代码(http://boto3.readthedocs.io/en/latest/_modules/boto3/s3/transfer.html),我认为它已经自动完成了(在我粘贴的链接中查找“max_concurrency”)。 - Savir
aiobotocore 不应该使用的常见原因是什么? - Alex R
3个回答

17

如果您想使用boto3直接并行下载许多较小的文件到磁盘,可以使用multiprocessing模块。这是一个小片段,可以实现此功能。您可以像这样运行它:./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n

#!/usr/bin/env python3
import multiprocessing
import boto3
import sys

# make a per process s3_client
s3_client = None
def initialize():
  global s3_client
  s3_client = boto3.client('s3')

# the work function of each process which will fetch something from s3
def download(job):
  bucket, key, filename = job
  s3_client.download_file(bucket, key, filename)

if __name__ == '__main__':
  # make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
  bucket = sys.argv[1]
  jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]

  # make a process pool to do the work
  pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
  pool.map(download, jobs)
  pool.close()
  pool.join()

其中一个重要的内容是我们为每个进程创建一个S3客户端实例,每个进程都会重复使用这个实例。这很重要有两个原因。首先,创建客户端很慢,因此我们希望尽可能少地进行这样的操作。其次,不应在进程之间共享客户端,因为对 download_file 的调用可能会更改客户端的内部状态。


尝试共享内部s3客户端状态是否是需要使用多进程而不是多线程的原因?我有一个类似的需求,需要下载许多较小的文件,并且想知道由于多进程池启动时间较长而导致的影响。 - Richard
2
只要为池中的每个线程创建一个客户端,使用线程池的多线程应该可以工作。相比具有线程本地存储的线程,多进程似乎需要更少的代码来使用“全局”技巧来说明这一点。 - Kevin Kreiser
我尝试过使用单个客户端为所有线程和每个线程一个客户端 - 两者都可以正常工作。...看起来boto团队对于必要的内容存在显著(轻描淡写!)的不确定性和缺乏清晰度。 - Richard
@Richard 可能他们故意将这些信息混淆,以便更轻松地对核心假设进行更改,例如最近的版本可能变得线程安全,但旧版本则不是。正如你所说,目前还是个谜! - Kevin Kreiser
不确定。肯定知道有一个GitHub线程,许多人已经反复问了几个月,但他们没有任何声明。不幸的是,我认为这并不是他们应有的责任感。 - Richard
有没有不使用 global 关键字的方法来实现它? - Vineet

0
面对未知的线程安全状态,这是使用Python>=3.7中多进程编程的一种方法,来使用boto3.Client
import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse

import boto3
from jsonargparse import CLI


def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
    """Batch an iterator. The last item might be of smaller len than batch_size.

    Args:
        iterable (Iterable): Any iterable that should be batched
        batch_size (int): Len of the generated lists

    Yields:
        Generator[List, None, None]: List of items in iterable
    """
    batch = []
    counter = 0
    for i in iterable:
        batch.append(i)
        counter += 1
        if counter % batch_size == 0:
            yield batch
            batch = []
    if len(batch) > 0:
        yield batch


def download_batch(batch):
    s3 = boto3.client("s3")
    n = 0
    for line in batch:
        dst, line = line
        url = urlparse(line)
        url_path = url.path.lstrip("/")
        folder, basename = os.path.split(url_path)
        dir = os.path.join(dst, folder)
        os.makedirs(dir, exist_ok=True)
        filepath = os.path.join(dir, basename)
        print(f"{filepath}")
        s3.download_file(url.netloc, url_path, filepath)
        n += 1
    return n


def file_reader(fp, dst):
    with open(fp) as f:
        for line in f:
            line = line.rstrip("\n")
            yield dst, line


def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
    """Copy files from s3 based on a list of urls. The output folder structure follows
    the s3 path.

    Args:
        txt_path (str): path to your list of files. One url per line.
        dst (str): path to store the files.
        n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
         the computer.
    """
    total_files = sum([1 for _ in file_reader(txt_path, dst)])
    print(n_cpus)
    n_cpus = min(total_files, n_cpus)
    batch_size = total_files // n_cpus
    with Pool(processes=n_cpus) as pool:
        for n in pool.imap_unordered(
            download_batch, batcher(file_reader(txt_path, dst), batch_size)
        ):
            pass


if __name__ == "__main__":
    CLI(copy_cli)

使用方法

pip install jsonargparse boto3

my_list.txt

s3://path/to/file1.abc
s3://path/to/file2.cdf

python s3cp.py my_list.txt --dst ../my_dst_path/ --n_cpus=5

希望能对您有所帮助。您可以在此存储库中找到相同的代码https://github.com/fcossio/s3-selective-copy


0
以下代码片段将允许您使用多进程从S3下载多个对象。
import boto3
import multiprocessing as mp
import os

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
        
def s3download(object_key_file):
    my_bucket.download_file(object_key_file[0], object_key_file[1])
    print('downloaded file with object name... {}'.format(object_key_file[0]))
    print('downloaded file with file name... {}'.format(object_key_file[1]))
        
def parallel_s3_download():
    object_key_file=[]
    for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
        # Need to split s3_object.key into path and file name, else it will give error file not found.
        path, filename = os.path.split(s3_object.key)
        object_key_file.append((s3_object.key,filename))
    object_key_file.pop(0)
    pool = mp.Pool(min(mp.cpu_count(), len(object_key_file)))  # number of workers
    pool.map(s3download, object_key_file, chunksize=1)
    pool.close()
if __name__ == "__main__":
    parallel_s3_download()
    print('downloading zip file')

object_key_file.pop(0)是用来干什么的?此外,我遇到了这个错误:TypeError: s3download()缺少一个必需的位置参数:'object_key_file'。 - im281

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接