从Beam管道连接Google Cloud SQL Postgres实例

4
我想在运行于Google Dataflow的Apache Beam管道中,使用Python SDK连接Google Cloud SQL Postgres实例。
我找不到适当的文档。
在Cloud SQL的指南中,我没有看到任何关于Dataflow的文档。
https://cloud.google.com/sql/docs/postgres/
有人能提供文档链接或GitHub示例吗?
2个回答

6

您可以使用来自beam-nuggetsrelational_db.Writerelational_db.Read转换:

首先安装beam-nuggets:

pip install beam-nuggets

阅读:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)

写作:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
        create_if_missing=True,
    )
    table_config = relational_db.TableConfiguration(
        name='months',
        create_if_missing=True
    )
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )

1
为了使用Python SDK从Dataflow工作器连接到Cloud SQL DB,我不得不在所有工作器上安装和运行cloud_sql_proxy CLI(或者将DB的所有IP列入白名单,但这并不理想)。我知道Java SDK内置了一些代理/套接字功能;@mohaseeb,你知道Python是否有类似的技巧吗?顺便说一下,我正在使用beam_nuggets,除了那个独立的代理问题外,它的工作非常出色 :) - chmod_007
2
@chmod_007 我不知道是否有类似的功能。但是,你可以通过启用Cloud SQL数据库上的“私有IP”连接,并确保Cloud SQL数据库和Dataflow工作器位于相同的Google Cloud区域(并使用其私有IP连接到数据库),而无需“使用cloud_sql_proxy”或“将worker IP列入白名单”,从你的管道连接到DB。有关更多信息,请访问https://cloud.google.com/sql/docs/postgres/private-ip。 - mohaseeb
@chmod_007 很高兴看到你觉得 beam_nuggets 很有用;我会在假期抽出一些时间来继续开发它。 - mohaseeb
最新的文档显示,relational_db.Read 方法已经更改为 relational_db.ReadFromDB:http://mohaseeb.com/beam-nuggets/beam_nuggets.io.relational_db.html#beam_nuggets.io.relational_db.ReadFromDB - Gino Mempin

1
Java SDK包含JdbcIO,该工具可连接到任何可以通过标准Java JDBC机制访问的数据库。目前,在Beam Python SDK中没有类似的工具。如果有的话,我想它可能会使用Python DB-API。欢迎提交功能请求或参与贡献-开发应该相当简单(例如通过模拟Java JdbcIO的源代码),但非常有用 :)

谢谢@jkff 我正在尝试使用Java SDK进行连接。 我能够使用代理连接到Postgres实例并在本地运行pipline。 但是,在部署到Dataflow后,是否有任何方式可以使Beam管道直接连接到Cloud SQL Postgres? 我尝试按照以下步骤进行操作: https://cloud.google.com/appengine/docs/flexible/java/using-cloud-sql-postgres 但它给我返回了Google的未知主机。 您能否指向连接Dataflow到Postgres Cloud SQL的文档? - Sandesh
你能否发一篇新问题,详细说明在尝试将在本地工作的相同管道运行到Dataflow时出现了什么问题? - jkff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接