如何通过Python Boto3将数据加载到Amazon Redshift?

18
在Amazon Redshift的入门指南中,数据从Amazon S3中提取并使用SQLWorkbench/J加载到Amazon Redshift集群中。我想模仿连接到集群并使用Boto3将示例数据加载到集群的相同过程。
然而,在Boto3的文档中,我无法找到一种允许我将数据上传到Amazon Redshift集群的方法。
我已经能够使用以下代码连接到Redshift并利用Boto3进行操作:
client = boto3.client('redshift')

但我不确定有什么方法可以让我像在SQLWorkbenchJ教程中那样创建表格或上传数据到Amazon Redshift。


就此而言,AWS SDK for Pandas(awswrangler)具有适当的方法,例如copy_from_files() - 00schneider
3个回答

25

没错,您需要使用psycopg2 Python 模块来执行 COPY 命令。

我的代码看起来像这样:

import psycopg2
#Amazon Redshift connect string 
conn_string = "dbname='***' port='5439' user='***' password='***' host='mycluster.***.redshift.amazonaws.com'"  
#connect to Redshift (database should be open to the world)
con = psycopg2.connect(conn_string);
sql="""COPY %s FROM '%s' credentials 
      'aws_access_key_id=%s; aws_secret_access_key=%s'
       delimiter '%s' FORMAT CSV %s %s; commit;""" % 
      (to_table, fn, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,delim,quote,gzip)

#Here
#  fn - s3://path_to__input_file.gz
#  gzip = 'gzip'

cur = con.cursor()
cur.execute(sql)
con.close() 

我使用boto3/psycopg2编写了CSV_Loader_For_Redshift


1
使用psycopg2,我们需要将VPC添加到Lambda函数中吗? - Minesh Barot

12

回到您提供的教程中的第4步骤。看到它如何显示集群URL吗?您必须使用PostgreSQL驱动程序连接到该URL。AWS SDK(例如Boto3)提供对AWS API的访问。您需要通过PostgreSQL API连接到Redshift,就像您连接到RDS上的PostgreSQL数据库一样。


谢谢!我现在正在尝试弄清楚如何使用SQLAlchemy而不是SQLWorkbenchJ,因为我在使用SQLWorkbenchJ时遇到了一些困难(我可能会编辑这个问题或创建一个新的问题)。 - Chris
如果您有新的问题,请创建一个新的问题。请不要编辑现有的问题来询问新问题。 - Mark B
使用Aginity Workbench for Redshift连接到集群。它具有更好的用户界面和功能,并且也被AWS推荐使用。 - Paladin

1

使用psycopg2 & get_cluster_credentials

前提条件 -

  • 已将IAM角色附加到相应的用户

    IAM角色具有get_cluster_credentials策略LINK

  • 在云(EC2)上,已附加适当的IAM角色

如果您将其部署在已配置了用户AWS凭据的PC / VM上[ CLI - aws configure ]或 您位于同一帐户,VPC中的实例,则以下代码仅起作用。

  1. Have a config.ini file -

     [Redshift]
    
     port = 3389
    
     username = please_enter_username
    
     database_name = please_database-name
    
     cluster_id = please_enter_cluster_id_name
    
     url = please_enter_cluster_endpoint_url
    
     region = us-west-2
    
  2. My Redshift_connection.py

     import logging
    
     import psycopg2
    
     import boto3
    
     import ConfigParser
    
    
     def db_connection():
        logger = logging.getLogger(__name__)
    
        parser = ConfigParser.ConfigParser()
    
        parser.read('config.ini')
    
        RS_PORT = parser.get('Redshift','port')
    
        RS_USER = parser.get('Redshift','username')
    
        DATABASE = parser.get('Redshift','database_name')
    
        CLUSTER_ID = parser.get('Redshift','cluster_id')
    
        RS_HOST = parser.get('Redshift','url')
    
        REGION_NAME = parser.get('Redshift','region')
    
        client = boto3.client('redshift',region_name=REGION_NAME)
    
        cluster_creds = client.get_cluster_credentials(DbUser=RS_USER,
                                                    DbName=DATABASE,
                                                    ClusterIdentifier=CLUSTER_ID,
                                                    AutoCreate=False)
    
     try:
       conn = psycopg2.connect(
         host=RS_HOST,
         port=RS_PORT,
         user=cluster_creds['DbUser'],
         password=cluster_creds['DbPassword'],
         database=DATABASE
       )
    
       return conn
     except psycopg2.Error:
       logger.exception('Failed to open database connection.')
       print "Failed"
    
  3. Query Execution script -

     from Redshift_Connection import db_connection
    
     def executescript(redshift_cursor):
         query = "SELECT * FROM <SCHEMA_NAME>.<TABLENAME>"
         cur=redshift_cursor
         cur.execute(query)
    
     conn = db_connection()
     conn.set_session(autocommit=False)
     cursor = conn.cursor()
     executescript(cursor)
     conn.close()
    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接