谷歌云Composer和谷歌云SQL

10
我们有哪些方法可以从新引入的Google Cloud Composer连接到Google Cloud SQL(MySQL)实例?目的是将Cloud SQL实例中的数据传输到BigQuery(可能需要通过Cloud Storage进行中介步骤)。
  1. Cloud SQL代理是否可以在托管Composer的Kubernetes集群的pods部分公开?
  2. 如果不行,是否可以使用Kubernetes服务经纪人引入Cloud SQL代理?-> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker
  3. 应该使用Airflow来调度和调用GCP API命令,例如1)将mysql表导出到云存储2)将mysql导出读入bigquery吗?
  4. 也许还有其他我忽略的方法来完成此操作。

Raj,你最终找到方法了吗?我也在尝试。 - Thibault Clement
6个回答

7
"The Cloud SQL Proxy可以安全地访问您的Cloud SQL Second Generation实例,无需将IP地址列入白名单或配置SSL。" -Google CloudSQL-Proxy Docs 在所有其他方式中,CloudSQL Proxy似乎是连接到CloudSQL的推荐方式。因此,在Composer中,从版本1.6.1开始,我们可以创建一个新的Kubernetes Pod来运行gcr.io/cloudsql-docker/gce-proxy:latest镜像,通过服务进行公开,然后创建一个连接以在操作员中使用。
设置步骤如下:
  • 请遵循 Google文档 中的指示。

  • 使用 Arik的Medium文章 中的信息测试连接。

    • 检查 Pod 是否已创建 kubectl get pods --all-namespaces

    • 检查 Service 是否已创建 kubectl get services --all-namespaces

    • 进入一个工作节点 kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • 测试 mysql 连接 mysql -u composer -p --host <service-name>.default.svc.cluster.local

注意:

  • 现在的Composer使用命名空间来组织Pods

  • 不同命名空间中的Pods 无法互相通信,除非您提供完整路径<k8-service-name>.<k8-namespace-name>.svc.cluster.local

  • 使用完整路径创建一个新的Composer连接将使连接成功


对我来说有效,唯一需要更改的是用户“composer”不存在,而是使用没有密码的“root”用户。 - Benos
谷歌文档的链接[https://github.com/GoogleCloudPlatform/cloudsql-proxy/blob/master/Kubernetes.md]无法访问。 - lordvcs
同样,我的<k8-namespace-name>也是“default”,用户名不是“composer”,而是“root”。我的composer环境是composer-1-16-5-airflow-1-10-15。 - Kavi Sek
感谢@Micah Miller,这个方法非常有效。问题出在命名空间上。CloudSqlProxy部署及其服务运行在默认命名空间中,而airflow-scheduler则在另一个命名空间中,只有在提供完整路径<k8-service-name>.<k8-namespace-name>.svc.cluster.local后连接才成功。 - Surabhi Sharma

4

我们曾经遇到过相同的问题,但是是在一个Postgres实例中。这是我们所做的,并使其正常工作:

  • 在airflow运行的Kubernetes集群中创建一个sqlproxy部署。这是现有的airflow_db连接使用的airflow-sqlproxy的副本,文件中需要进行以下更改:

    • 将所有airflow-sqlproxy实例替换为新代理名称
    • 在“spec:template:spec:containers:command:-instances”下进行编辑,将现有实例名称替换为我们要连接的新实例
  • 创建一个kubernetes服务,同样是现有的airflow-sqlproxy-service的副本,并进行以下更改:

    • 将所有airflow-sqlproxy实例替换为新代理名称
    • 在“spec:ports”下更改为适当的端口(对于Postgres实例,我们使用了5432)
  • 在airflow UI中添加一个类型为Postgres的连接,其中主机设置为新创建的服务名称。


1
我在哪里可以找到Airflow-SQLProxy和Service的YAML文件? - Leo
2
在 Kubernetes 引擎下 > 服务 - joaopcoelho
3
这也是我最好的工作方式。但是,重要提示:当前版本的Cloud Composer在其自己的命名空间中运行其组件。确保您将YAML中的默认命名空间更改为CC使用的命名空间。否则,DNS解析将失败。 - t-h-

2

您可以按照这些说明在集群中启动新的Cloud SQL代理实例。

关于#3:听起来是个好计划。据我所知,没有Cloud SQL到BigQuery运算符,因此您必须像您描述的那样分两个阶段进行。


这就是我们连接到(postgre) CloudSQL实例的方式。 - HulaHoof
我按照上面的指示成功地使用MySQL CLI从调度程序(ssh进入)连接到了Cloud SQL,但实际任务中的Pod无法连接到它。它们被安排在一个节点池中 - 这会有任何影响吗? - Leo
1
这些指令建议以“旁车”模式启动代理,如果您无法控制已启动的Pod(例如,如果使用Kubernetes Operator),则可能会出现问题。请查看此文章获取更多信息:https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53 - Leo

1

将@Leo在评论中的Medium文章添加到顶级https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53。一旦您按照该文章并设置了服务,您可以使用SQLAlchemy从您的DAG连接,如下所示:

import os
from datetime import datetime, timedelta
import logging

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"

default_args = {
    'start_date': datetime(2019, 7, 16)
}


def connect_to_cloud_sql():
    '''
        Create a connection to CloudSQL
    :return:
    '''
    import sqlalchemy
    try:
        PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
        logger.info("DB URL", PROXY_DB_URL)
        engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
        for result in engine.execute("SELECT NOW() as now"):
            logger.info(dict(result))
    except Exception:
        logger.exception("Unable to interact with CloudSQL")


dag = DAG(
    dag_id="example_sqlalchemy",
    default_args=default_args,
    # schedule_interval=timedelta(minutes=5),
    catchup=False  # If you don't set this then the dag will run according to start date
)


t1 = PythonOperator(
    task_id="example_sqlalchemy",
    python_callable=connect_to_cloud_sql,
    dag=dag
)


if __name__ == "__main__":
    connect_to_cloud_sql()

0

在Hoffa的类似问题的回答中,您可以找到关于Wepay如何使用Airflow操作符每15分钟保持同步的参考。

从上述答案中:

看一下WePay是如何做到这一点的:

MySQL到GCS操作符针对MySQL表执行SELECT查询。SELECT检索所有大于(或等于)上一个高水位标记的数据。高水位标记是表的主键(如果表是仅追加的),或修改时间戳列(如果表接收更新)。同样,SELECT语句也会向后回溯一段时间(或行),以捕获由于上述问题而可能删除的行。

使用Airflow,他们成功地使BigQuery每15分钟与他们的MySQL数据库保持同步。


虽然这个链接可能回答了问题,但最好在此处包含答案的基本部分,并提供参考链接。仅有链接的答案可能会因为链接页面的更改而失效。- 【来自审查】 - ste-fu
我已经添加了答案的原始文本,希望这样更好。 - Michele 'Ubik' De Simoni

0

现在我们可以连接到Cloud SQL而不必自己创建云代理。操作员将自动创建它。代码如下:

from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator

export_body = {
    'exportContext': {
        'fileType': 'CSV',
        'uri': EXPORT_URI,
        'databases': [DB_NAME],
        'csvExportOptions': {
            'selectQuery': SQL
        }
    }
}

default_dag_args = {}

with DAG(
        'postgres_test',
        schedule_interval='@once',
        default_args=default_dag_args) as dag:

    sql_export_task = CloudSqlInstanceExportOperator(
        project_id=GCP_PROJECT_ID,
        body=export_body,
        instance=INSTANCE_NAME,
        task_id='sql_export_task'
    )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接