如何将 Azure Blob 流式传输到 AWS S3?

5

我需要将一个大的Azure Blob复制到AWS S3,而不在内存中保留其副本。通过一些搜索,我已经找到了一堆例子并将它们结合在下面的脚本中。但这仍会将数据加载到内存中。有没有好的方法来避免这种情况?

import boto3
from azure.storage.blob import BlobClient

with io.BytesIO() as input_stream, io.BytesIO() as output_stream:
    blob_client = BlobClient.from_connection_string(
        conn_str=AZURE_CONNECTION_STRING,
        container_name=container,
        blob_name=filename,
    )
    blob_client.download_blob().readinto(input_stream)

    input_stream.seek(0)
    shutil.copyfileobj(input_stream, output_stream)
    output_stream.seek(0)

    boto3.resource("s3").Object(BUCKET_NAME, s3_key).put(Body=output_stream)

你的例子对我没有任何帮助。你如何将同步广播转换为非同步用户?在流中,缓冲内存是必需的。即使你不这样做,他也会制造硬件。没有资源管理,你无法提供流媒体服务。((延迟*FPS) == 缓冲区大小)。(在你的代码中已经有重复数据,你不能读取不存在的数据。如果你读取了这些数据,你将会得到两份数据。) - dsgdfg
我不熟悉Azure Blob [我的问题可能不太合理],但我熟悉AWS,尤其是S3。最近,我已将一个S3存储桶挂载为文件系统,在类似于您描述的情况下它可以完美地工作。这个方法对你有用吗? - Dan M
@JamesMead 代码将作为 Lambda 运行。 - Andrei
3个回答

3

如果您似乎是一次性读取 Blob,请注意 Blob 的副本在内存中。您正在初始化两个 io.BytesIO 实例,但然后使用 blob_client.download_blob().readinto(input_stream) 读取整个 Blob。

我认为您应该尝试的是按块读取(和放置)Blob,一次一个块,避免将其全部读入内存。

对于上传方面(S3),您可以通过两种方式解决问题。您可以:

  • 使用 S3 部分(分块)上传机制(使用 .upload() 进行初始化,然后使用 .upload_part() 上载每个部分(块),或
  • .upload_fileobj() 提供一个类似文件的对象,其将负责一次提供一个块

据我所知,似乎 blob_client.download_blob() 已经返回一个名为 StorageStreamDownloader 的类似文件的对象,实现了一个 chunks() 方法。我找不到适当的文档,但根据源代码,它似乎正在返回一个您可以使用的迭代器。

因此,考虑像这样做(我现在无法访问任何 Azure/S3 服务,因此该代码可能无法直接使用):

import boto3
from boto3.s3.transfer import TransferConfig, S3Transfer

blob_client = BlobClient.from_connection_string(
    conn_str=AZURE_CONNECTION_STRING,
    container_name=container,
    blob_name=filename,
)
s3 = boto3.resource('s3')

mpu = s3.create_multipart_upload(Bucket=BUCKET_NAME, Key=s3_key)
mpu_id = mpu["UploadId"]

blob = blob_client.download_blob()
for part_num, chunk in enumerate(blob.chunks()):
    s3.upload_part(
        Body=chunk,
        Bucket=BUCKET_NAME,
        Key=s3_key,
        UploadId=mpu_id,
        PartNumber=part_num,
    )

就像我之前提到的 - 我现在无法访问任何blob存储/S3资源,所以我仅仅是眼观代码。但是基本思路应该是相同的。通过使用blob的 .chunks() 方法,您只需将数据的一个小块获取到内存中,然后将其上传(使用MPU)到S3,并立即丢弃。


这个可行的流媒体是否经过验证?如果没有,我愿意支付几百点赏金来验证一下。我最大的问题是:boto3如何知道流媒体已经完成? - SeaDude

0

根据这里的samu的回答,这是一个可行的例子,其中有一个缺失的部分与完成多部分上传相关。

def copy_from_azure_to_s3(conn_str:str,container_name:str,file_name:str,bucket_name:str,s3):

    #initiate Azure client
    blob_client = BlobClient.from_connection_string(
        conn_str=con_string,
        container_name=container_name,
        blob_name=file_name,
        max_chunk_get_size=50*1024*1024 #min size for multipart upload is 5MB, it needs to be higher than that
    )

    #define multipart upload
    mpu = s3.create_multipart_upload(Bucket=bucket_name, Key=file_name)
    mpu_id = mpu["UploadId"]

    blob = blob_client.download_blob()

    #store info about individual parts
    etags=[]

    #stream it to s3
    for part_num, chunk in enumerate(blob.chunks(), start=1):
        response= s3.upload_part(
            Body=chunk,
            Bucket=bucket_name,
            Key=file_name,
            UploadId=mpu_id,
            PartNumber=part_num,
        )
        etags.append({'ETag': response['ETag'],'PartNumber':part_num})

    #finish the upload
    s3.complete_multipart_upload(
        Bucket=bucket_name,
        Key=file_name,
        UploadId=mpu_id,
        MultipartUpload={
            'Parts': etags
        },

    )

0
有一种非常简单的方法可以做到这一点。
import temp

blob_client = ...
s3_client = ...

with tempfile.NamedTemporaryFile() as temp_file:
    for chunk in blob_client.download_blob().chunks():
        # download in chunk and flush it into hard disk from memory
        temp_file.write(chunk)
        temp_file.flush()

    # the below upload automatically handles the multi-part uploading
    s3_client.upload(temp_file.name, s3_bucket, s3_key) 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接