GCS - Python下载包含目录结构的blob

4
我使用GCS Python SDK和Google API客户端的组合来循环访问启用版本控制的存储桶,并根据元数据下载特定对象。
from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    for item in response['items']:
        if item['metadata']['epoch'] == restore_epoch:
            print(item['bucket'])
            print(item['name'])
            print(item['metadata']['epoch'])
            print(item['updated'])
            blob = source_bucket.blob(item['name'])
            blob.download_to_filename(
                '/Users/admin/git/data-processing/{}'.format(item))


downloadepoch_objects()

上述函数对于不在目录中的 blob(gs://bucketname/test1.txt)可以正常工作,因为传递的项只是 test1.txt。我遇到的问题是在尝试从复杂的目录树中下载文件时(gs://bucketname/nfs/media/docs/test1.txt),传递的项是 nfs/media/docs/test1.txt。是否可能让 .download_to_file() 方法在必要时创建目录?

3个回答

2
以下是可行的解决方案。我最终剥离了对象名称中的路径,并动态创建了目录结构。更好的方法可能是像@Brandon Yarbrough建议的那样使用“prefix + response['prefixes'][0]”,但我无法完全弄清楚。希望这能帮助其他人。
#!/usr/local/bin/python3

from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
import os
import pathlib

bucket_name = 'test-bucket'
restore_epoch = '1519189202'
restore_location = '/Users/admin/data/'

credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)

storage_client = storage.Client()
source_bucket = storage_client.get_bucket(bucket_name)


def listall_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    print(json.dumps(response, indent=2))


def listname_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    for item in response['items']:
        print(item['name'] + ' Uploaded on: ' + item['updated'] +
              ' Epoch: ' + item['metadata']['epoch'])


def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()

    try:
        for item in response['items']:
            if item['metadata']['epoch'] == restore_epoch:
                print('Downloading ' + item['name'] + ' from ' +
                      item['bucket'] + '; Epoch= ' + item['metadata']['epoch'])
                print('Saving to: ' + restore_location)
                blob = source_bucket.blob(item['name'])
                path = pathlib.Path(restore_location + r'{}'.format(item['name'])).parent
                if os.path.isdir(path):
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
                else:
                    os.mkdir(path)
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
    except Exception:
        pass


# listall_objects()
# listname_objects()
downloadepoch_objects()

1

GCS没有"目录"的概念,尽管像gsutil这样的工具为了方便而假装有。如果您想要在"path/nfs/media/docs/"下获取所有对象,可以将其指定为前缀,如下所示:

request = service.objects.list(
    bucket=bucket_name,
    versions=True,
    prefix='nfs/media/docs/',  # Only show objects beginning like this
    delimiter='/'  # Consider this character a directory marker.
)
response = request.execute()
subdirectories = response['prefixes']
objects = response['items']

由于参数prefix,只有以“nfs/media/docs”开头的对象会在response['items']中返回。由于参数delimiter,“子目录”将在response['prefixes']中返回。您可以在Python objects.list方法的文档中获取更多细节。
如果您使用较新的Google Cloud Python库,我建议使用新代码,相同的调用看起来会非常相似
from google.cloud import storage

client = storage.Client()
bucket = client.bucket(bucket_name)
iterator = bucket.list_blobs(
    versions=True,
    prefix='nfs/media/docs/',
    delimiter='/'
)
subdirectories = iterator.prefixes
objects = list(iterator)

谢谢回复。为了进一步解释我的具体用例:我设置了一个cron作业,使用gsutil备份nfs服务器文件目录。该作业每天运行一次,并将数据备份到启用版本的存储桶中。该存储桶保留每个资产的8个版本,使我能够将整个目录树回滚8天。每个对象都带有名为“epoch”的自定义元数据标记。 - glux
基于这个标签,我能够遍历整个存储桶,只下载与时代元数据标签相等的项。我无法使用gsutil,因为它不支持按特定版本或元数据标签下载。我的目录树始终在变化,所以指定前缀也行不通。我需要找出一种方法来遍历存储桶对象,匹配时代元数据标记,最后下载对象并生成一个始终在变化的目录结构。你有什么想法? - glux
让我看看是否理解正确。您有一个NFS目录树。它与GCS存储桶同步。每天,您将NFS目录树同步到存储桶,但保留版本控制。您想要能够从某一天恢复NFS目录树的状态? - Brandon Yarbrough
没错。实际上我正在使用gsutil cp命令备份目录树,这样每次备份作业运行时我都会得到一个完整的副本。这样当我恢复时,所有对象都将被恢复,而不仅仅是新的/更改的对象。每次'gsutil cp'命令运行时,它都会为每个对象附加相同的元数据标签(时代)。当我在存储桶上运行列表时,我可以循环遍历存储桶中的每个对象,并仅下载具有元数据标签的对象。这在我的原始代码中有效,但由于文件位于“文件夹”中,所以下载失败,因为我的代码无法创建目录结构。 - glux
为什么不让你的代码创建目录结构呢?你可以在 prefix + response['prefixes'][0] 上再次调用 list()。 - Brandon Yarbrough
谢谢Brandon。你能否提供一个使用前缀+response['prefixes'][0]来获取前缀列表的示例?然后我将使用这个列表并调用os.mkdir来创建目录。当运行response['prefixes'][0]时,我能够获取单个目录的值。然后我必须修改前缀以进一步深入。response = request.execute() subdirectories = response['prefixes'] print(subdirectories) - glux

1

下面的解决方案对我有效。我正在递归地从路径前缀下载所有blob到项目根目录中的model文件夹,同时保持文件夹结构不变。 多个blob被同时下载。

GCS客户端版本为 google-cloud-storage==1.41.1

import os
from datetime import datetime
from google.cloud import storage
from concurrent.futures import ThreadPoolExecutor

BUCKET_NAME = "ml-model"

def timer(func):
    def time_wrapper(*arg, **kwargs):
        start = datetime.now()
        func(*arg, **kwargs)
        diff = datetime.now() - start
        logger.info(f"{func.__name__} took {diff.seconds} s and {diff.microseconds//1000} ms")
    return time_wrapper

def fetch_environment() -> str:
    env = os.environ.get("environment", "staging")
    return env


def create_custom_folder(dir_name: str):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)


def fetch_gcs_credential_file_path():
    return os.environ.get("GCS_CREDENTIAL_FILE_PATH")


class GCS:
    def __init__(self):
        cred_file_path = fetch_gcs_credential_file_path()
        self.client = storage.Client.from_service_account_json(cred_file_path)
        self.bucket = self.client.bucket(BUCKET_NAME)

    def download_blob(self, blob):
        filename = blob.name.replace(self.path_prefix, '')
        delimiter_based_splits = filename.split('/')
        if len(delimiter_based_splits) > 1:
            dir_name = "model/" + "/".join(delimiter_based_splits[: len(delimiter_based_splits)-1])
            create_custom_folder(dir_name)
            blob.download_to_filename(f"{dir_name}/{delimiter_based_splits[-1]}")
        else:
            blob.download_to_filename(f"model/" + filename)
    @timer
    def download_blobs_multithreaded(self, prefix: str):
        '''
        CREATE FOLDER IF NOT EXISTS
        '''
        create_custom_folder("model")

        blobs = self.bucket.list_blobs(prefix=prefix)

        self.path_prefix = prefix
        with ThreadPoolExecutor() as executor:
            executor.map(self.download_blob, blobs


def download_model():
    env = fetch_environment()
    folder_path_prefix = f"ml/{env}/{ML_MODEL_NAME}/v1/tf-saved-model/"
    gcs = GCS()
    gcs.download_blobs_multithreaded(folder_path_prefix)

if __name__ == '__main__':
    download_model()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接