从 Google Cloud 存储桶复制到 S3 存储桶

3
我已设置了一个Airflow工作流,将一些文件从S3导入到Google Cloud存储,并运行一系列SQL查询的工作流程以在Big Query上创建新表。在工作流结束时,我需要将一个最终的Big Query表的输出推送到Google Cloud Storage,然后再从那里推送到S3。
我已经使用BigQueryToCloudStorageOperator Python操作符成功地传输了Big Query表格到Google Cloud Storage,但似乎从Google Cloud Storage到S3的传输是不太常见的路线,我无法找到一个可以在我的Airflow工作流中自动化的解决方案。
我知道gsutil的一部分是rsync,并且已经使其工作(请参见Exporting data from Google Cloud Storage to Amazon S3),但我无法将其添加到我的工作流中。
我有一个运行在计算引擎实例上的docker化的Airflow容器。
非常感谢您解决这个问题。
非常感谢!
4个回答

8

因此,我们也使用rsync在S3和GCS之间传输数据。

您首先需要编写一个bash脚本,类似于gsutil -m rsync -d -r gs://bucket/key s3://bucket/key

对于s3,您还需要提供AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY作为环境变量。

然后定义您的BashOperator并将其放入DAG文件中。

rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
                                bash_command='Your rsync script',
                                dag=dag)

谢谢成志 - 你如何在计算引擎上定义S3凭证?在我的Mac本地,我已经将它们添加到了.boto配置文件中,但是我无法在计算引擎实例中找到它的等效物。 - D_usv
你能尝试通过ssh在计算引擎上添加.boto配置文件吗? - Chengzhi
我已经使用DataProc搭建了一个Hadoop集群,并使用distcp进行了传输。一旦我成功使用airflow的DataProcHadoopOperator,我会更新答案。 - D_usv
你也可以在bash命令中使用export命令来处理凭证。bash_command='export AWS_ACCESS_KEY_ID="your_key"; export AWS_SECRET_ACCESS_KEY="your_secret"; gsutil -m rsync -d -r gs://bucket/key s3://bucket/key' - Kannappan Sirchabesan

0

0

我有一个需求,需要使用AWS Lambda将GC存储桶中的对象复制到S3。

Python boto3库允许从GC存储桶中列出和下载对象。

以下是示例Lambda代码,用于将“sample-data-s3.csv”对象从GC存储桶复制到s3存储桶。

import boto3
import io

s3 = boto3.resource('s3')

google_access_key_id="GOOG1EIxxMYKEYxxMQ"
google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"

gc_bucket_name="my_gc_bucket"


def get_gcs_objects(google_access_key_id, google_access_key_secret,
                     gc_bucket_name):
    """Gets GCS objects using boto3 SDK"""
    client = boto3.client("s3", region_name="auto",
                          endpoint_url="https://storage.googleapis.com",
                          aws_access_key_id=google_access_key_id,
                          aws_secret_access_key=google_access_key_secret)

    # Call GCS to list objects in gc_bucket_name
    response = client.list_objects(Bucket=gc_bucket_name)

    # Print object names
    print("Objects:")
    for blob in response["Contents"]:
        print(blob)    

    object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
    f = io.BytesIO()
    client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
    object.put(Body=f.getvalue())

def lambda_handler(event, context):
    get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name) 

您可以循环遍历blob以从GC存储桶下载所有对象。

希望这能帮助想要使用AWS Lambda将对象从GC存储桶传输到S3存储桶的人。


-1
Google建议使用其“传输服务”在云平台之间进行传输。您可以使用他们的Python API编程设置传输。这样,数据直接在S3和Google云存储之间传输。使用“gsutil”和“rsync”的缺点是数据必须通过执行“rsync”命令的机器/实例。这可能会成为瓶颈。 Google Cloud Storage Transfer Service Doc

11
我认为该转移服务只支持从S3到GCS,不支持从GCS到S3。不过我可能错了。 - D_usv
3
你是正确的。根据这份Google转移服务文档,只有一个GCS接收器。https://cloud.google.com/storage/transfer/reference/rest/v1/TransferSpec - Jean-Christophe Rodrigue

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接