在Python Dataflow / Apache Beam上启动CloudSQL代理

10
我目前正在开发一个ETL数据流作业(使用Apache Beam Python SDK),它从CloudSQL(使用psycopg2和自定义的ParDo)查询数据并将其写入BigQuery。我的目标是创建一个Dataflow模板,我可以使用Cron作业从AppEngine启动。
我有一个本地工作的版本,使用DirectRunner。为此,我使用CloudSQL(Postgres)代理客户端,以便可以连接到127.0.0.1上的数据库。
当使用DataflowRunner和自定义命令在setup.py脚本中启动代理时,作业无法执行。 它停留在重复此日志消息: 设置节点注释以启用卷控制器附加/分离 我的setup.py部分如下所示:
CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['wget', 'https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64', '-O', 'cloud_sql_proxy'],
['echo', 'Proxy downloaded'],
['chmod', '+x', 'cloud_sql_proxy']]

class CustomCommands(setuptools.Command):
  """A setuptools Command class able to run arbitrary commands."""

  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    logging.info("Running custom commands")
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)
    subprocess.Popen(['./cloud_sql_proxy', '-instances=bi-test-1:europe-west1:test-animal=tcp:5432'])

我在阅读Github上sthomp这个问题和Stackoverflow上这个讨论后,在run()中添加了最后一行作为单独的subprocess.Popen()。我还尝试调整了subprocess.Popen的一些参数。

brodin提出的另一个解决方案是允许从任何IP地址访问并通过用户名和密码连接。在我看来,他并不认为这是最佳实践。

非常感谢您的帮助。

!!! 本文底部有解决方案 !!!


更新-日志文件

这些是作业期间发生的错误级别日志:


{{请注意:此处没有解释,保留原文中的占位符“{{”和“}}”。}}
E  EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities 
E  Image garbage collection failed once. Stats initialization may not have completed yet: unable to find data for container / 
E  Failed to check if disk space is available for the runtime: failed to get fs info for "runtime": unable to find data for container / 
E  Failed to check if disk space is available on the root partition: failed to get fs info for "root": unable to find data for container / 
E  [ContainerManager]: Fail to get rootfs information unable to find data for container / 
E  Could not find capacity information for resource storage.kubernetes.io/scratch 
E  debconf: delaying package configuration, since apt-utils is not installed 
E    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
E                                   Dload  Upload   Total   Spent    Left  Speed 
E  
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  3698  100  3698    0     0  25674      0 --:--:-- --:--:-- --:--:-- 25860 



#-- HERE IS WHEN setup.py FOR MY JOB IS EXECUTED ---

E  debconf: delaying package configuration, since apt-utils is not installed 
E  insserv: warning: current start runlevel(s) (empty) of script `stackdriver-extractor' overrides LSB defaults (2 3 4 5). 
E  insserv: warning: current stop runlevel(s) (0 1 2 3 4 5 6) of script `stackdriver-extractor' overrides LSB defaults (0 1 6). 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 
E  option = PIDFile; value = /var/run/stackdriver-agent.pid; 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 

这里是我自定义setup.py启动后的所有日志(日志级别:任意;所有日志):

https://jpst.it/1gk2Z

更新日志 2

作业日志(我手动取消了该作业,因为它在一段时间后没有卡住):

 2018-06-08 (08:02:20) Autoscaling is enabled for job 2018-06-07_23_02_20-5917188751755240698. The number of workers will b...
 2018-06-08 (08:02:20) Autoscaling was automatically enabled for job 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:02:24) Checking required Cloud APIs are enabled.
 2018-06-08 (08:02:24) Checking permissions granted to controller Service Account.
 2018-06-08 (08:02:25) Worker configuration: n1-standard-1 in europe-west1-b.
 2018-06-08 (08:02:25) Expanding CoGroupByKey operations into optimizable parts.
 2018-06-08 (08:02:25) Combiner lifting skipped for step Save new watermarks/Write/WriteImpl/GroupByKey: GroupByKey not fol...
 2018-06-08 (08:02:25) Combiner lifting skipped for step Group watermarks: GroupByKey not followed by a combiner.
 2018-06-08 (08:02:25) Expanding GroupByKey operations into optimizable parts.
 2018-06-08 (08:02:26) Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
 2018-06-08 (08:02:26) Annotating graph with Autotuner information.
 2018-06-08 (08:02:26) Fusing adjacent ParDo, Read, Write, and Flatten operations
 2018-06-08 (08:02:26) Fusing consumer Get rows from CloudSQL tables into Begin pipeline with watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Write into Group watermarks/Reify
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/GroupByWindow into Group watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WriteBundles/WriteBundles into Save new watermar...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/GroupByWindow into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Reify into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Write into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Write to BQ into Get rows from CloudSQL tables
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Reify into Write to BQ
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/Map(<lambda at iobase.py:926>) into Convert dict...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WindowInto(WindowIntoFn) into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Convert dictionary list to single dictionary and json into Remove "watermark" label
 2018-06-08 (08:02:26) Fusing consumer Remove "watermark" label into Group watermarks/GroupByWindow
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/InitializeWrite into Save new watermarks/Write/W...
 2018-06-08 (08:02:26) Workflow config is missing a default resource spec.
 2018-06-08 (08:02:26) Adding StepResource setup and teardown to workflow graph.
 2018-06-08 (08:02:26) Adding workflow start and stop steps.
 2018-06-08 (08:02:26) Assigning stage ids.
 2018-06-08 (08:02:26) Executing wait step start25
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/DoOnce/Read+Save new watermarks/Write/WriteI...
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/GroupByKey/Create
 2018-06-08 (08:02:26) Starting worker pool setup.
 2018-06-08 (08:02:26) Executing operation Group watermarks/Create
 2018-06-08 (08:02:26) Starting 1 workers in europe-west1-b...
 2018-06-08 (08:02:27) Value "Group watermarks/Session" materialized.
 2018-06-08 (08:02:27) Value "Save new watermarks/Write/WriteImpl/GroupByKey/Session" materialized.
 2018-06-08 (08:02:27) Executing operation Begin pipeline with watermarks/Read+Get rows from CloudSQL tables+Write to BQ+Gr...
 2018-06-08 (08:02:36) Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently runnin...
 2018-06-08 (08:02:46) Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently runnin...
 2018-06-08 (08:03:05) Workers have started successfully.
 2018-06-08 (08:11:37) Cancel request is committed for workflow job: 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:11:38) Cleaning up.
 2018-06-08 (08:11:38) Starting worker pool teardown.
 2018-06-08 (08:11:38) Stopping worker pool...
 2018-06-08 (08:12:30) Autoscaling: Reduced the number of workers to 0 based on the rate of progress in the currently runni...

堆栈跟踪:

No errors have been received in this time period.

更新:解决方案可以在我下面的回答中找到


@komarkovich 谢谢您的评论!有没有适当的方式可以向您提供日志文件?工作进程本身没有显示任何日志,可能是因为尚未启动。我无法在此处发布所有系统、kubelet等的日志,因为它们太长了。 - Thomas Schmidt
我需要您提供失败的Dataflow作业的日志。您可以在作业日志中找到它们 https://console.cloud.google.com/dataflow?jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>。应该会有一些错误信息告诉我们发生了什么。您不必发布所有日志(只需最相关的即可)。如果太多了,您可以使用justPasteIt工具在此处共享它们。 - komarkovich
更新了帖子并附上了日志文件(感谢使用justpaste.it的提示)。我从日志查看器中复制了日志。不幸的是,当使用您上面提供的链接和我的规格时,总是会落在作业列表上。 - Thomas Schmidt
谢谢您的回复,但那不是我要的。请发布数据流日志。对于那个链接抱歉,这个应该是正确的:https://console.cloud.google.com/dataflow/jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>。在这里找到该作业的日志并提供堆栈跟踪。 - komarkovich
感谢您的反馈。已添加作业日志和堆栈跟踪(未显示任何错误)。您有什么想法吗? - Thomas Schmidt
显示剩余6条评论
3个回答

9

解决方案:

我最终找到了一个解决方法。我想到了通过CloudSQL实例的公共IP连接的方法。为此,您需要允许来自任何IP的连接到您的CloudSQL实例:

  1. 进入GCP中CloudSQL实例的概述页面
  2. 点击授权选项卡
  3. 点击添加网络并添加0.0.0.0/0 (!! 这将允许任何IP地址连接到您的实例 !!)

为了增加安全性,我使用了SSL密钥,并仅允许SSL连接到该实例:

  1. 点击SSL选项卡
  2. 点击创建新证书为您的服务器创建SSL证书
  3. 点击创建客户端证书为您的客户端创建SSL证书
  4. 点击仅允许SSL连接拒绝所有非SSL连接尝试

之后,我将证书存储在Google Cloud Storage存储桶中,并在Dataflow任务内连接之前加载它们,例如:

import psycopg2
import psycopg2.extensions
import os
import stat
from google.cloud import storage

# Function to wait for open connection when processing parallel
def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            pass
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            pass
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

# Function which returns a connection which can be used for queries
def connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):

    # Get keys from GCS
    client = storage.Client()

    bucket = client.get_bucket(<YOUR_BUCKET_NAME>)

    bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')
    bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')
    os.chmod("client-key.pem", stat.S_IRWXU)
    bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')

    sslrootcert = 'server-ca.pem'
    sslkey = 'client-key.pem'
    sslcert = 'client-cert.pem'

    con = psycopg2.connect(
        host = host,
        hostaddr = hostaddr,
        dbname = dbname,
        user = user,
        password = password,
        sslmode=sslmode,
        sslrootcert = sslrootcert,
        sslcert = sslcert,
        sslkey = sslkey)
    return con

然后我会在自定义的ParDo中使用这些函数来执行查询。
最简示例:

import apache_beam as beam

class ReadSQLTableNames(beam.DoFn):
    '''
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    '''
    def __init__(self, host, hostaddr, dbname, username, password):
        super(ReadSQLTableNames, self).__init__()
        self.host = host
        self.hostaddr = hostaddr
        self.dbname = dbname
        self.username = username
        self.password = password

    def process(self, element):

        # Connect do database
        con = connect_to_db(host = self.host,
            hostaddr = self.hostaddr,
            dbname = self.dbname,
            user = self.username,
            password = self.password)
        # Wait for free connection
        wait_select(con)
        # Create cursor to query data
        cur = con.cursor(cursor_factory=RealDictCursor)

        # Get all table names
        cur.execute(
        """
        SELECT
        tablename as table
        FROM pg_tables
        WHERE schemaname = 'public'
        """
        )
        table_names = cur.fetchall()

        cur.close()
        con.close()
        for table_name in table_names:
            yield table_name["table"]

一部分管道可能如下所示:
# Current workaround to query all tables: 
# Create a dummy initiator PCollection with one element
init = p        |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])

tables = init   |'Get table names' >> beam.ParDo(ReadSQLTableNames(
                                                host = known_args.host,
                                                hostaddr = known_args.hostaddr,
                                                dbname = known_args.db_name,
                                                username = known_args.user,
                                                password = known_args.password))

我希望这个解决方案能够帮助其他遇到类似问题的人。


这个方法是否确保在下载证书到Dataflow作业时,GCS的默认加密在传输过程中得以保留?@komarkovich - ljhennessy
1
那么使用setup.py文件和代理配置无法实现吗? - IoT user
@物联网 我还没有找到代理的解决方案。我希望将来会有一个好的方法,因为最近我在工作中遇到了一些问题。有时下载的文件是空的,我需要添加一些检查和重试。 - Thomas Schmidt
1
感谢@ThomasSchmidt。我希望Google能更加努力,因为它与其他两家主要的云计算公司相比还有很大差距。 - IoT user

6

我成功找到了更好或者至少更简单的解决方案。 在DoFn的setup函数中使用云代理来设置预连接。

class MyDoFn(beam.DoFn):
 def setup(self):
    os.system("wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy")
    os.system("chmod +x cloud_sql_proxy")
    os.system(f"./cloud_sql_proxy -instances={self.sql_args['cloud_sql_connection_name']}=tcp:3306 &")

该作业抛出错误 RuntimeError: mysql.connector.errors.InterfaceError: 2003: 无法连接到 'localhost:3306' 上的 MySQL 服务器,"即使它可以访问表格。" - Siddharth-Soni
1
对于私有IP数据流,我认为人们可能需要在云存储中添加代理文件。 - sernle
@sernle 云 NAT 可以允许使用私有 IP 数据流的上述解决方案,但如果无法使用云 NAT,则在 Cloud Storage 中使用代理文件是一个合理的解决方法。 - BusiPlay
它对我帮助很大。但是在最后一行,我添加了:"-dir=/cloudsql"。谢谢! - Max Zolotenko

3

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接