Airflow中针对外部连接的连接池化

5

我正在尝试为Airflow中创建的外部连接寻找连接池管理方法。
Airflow版本:2.1.0
Python版本:3.9.5
Airflow数据库:SQLite
已创建的外部连接:MySQL和Snowflake

我知道在airflow.cfg文件中有一些属性。

sql_alchemy_pool_enabled = True  
sql_alchemy_pool_size = 5

但是这些属性是用于管理内部数据库的 airflow,而在我的情况下是 SQLite。

我有一些任务需要读取或写入 MySQL 和 Snowflake 中的数据。

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE
     )

并且

insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)

从MySQL读取数据

def get_records():
    mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
    records = mysql_hook.get_records(sql=r"""Some select query""")
    print(records)

我观察到在Snowflake中,每个任务(同一个dag中有多个任务)都会创建一个新的会话,对于MySQL尚未进行验证。
是否有一种方法可以为外部连接(在我的情况下是Snowflake和MySQL)维护连接池,或者通过其他方式在同一个会话中运行同一DAG中的所有查询?
谢谢
1个回答

2
Airflow提供使用Pools作为限制对外部服务的并发性的方法。
您可以通过UI创建池: 菜单 -> 管理 -> 池
或者使用CLI
airflow pools set NAME slots

池子有插槽来定义使用资源的任务可以并行运行的数量。如果池子已满,则任务将排队等待插槽可用。
在操作器中使用池子只需在操作器中添加pool=Name
在您的情况下,假设Pool的名称为snowflake,则:
snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE,
          pool='snowflake',
     )

请注意,默认情况下,任务占用池中的1个插槽,但可以进行配置。如果使用pool_slots,任务可能会占用多个插槽。
snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          ...
          pool='snowflake',
          pool_slots=2,
     )

嗨@Elad,感谢您的快速回复。 我猜你提到的池是用于任务池管理的。 我想要一个用于数据库连接池管理的东西。 这个池参数也适用于数据库连接吗?谢谢。 - Shashank Gupta
1
你可以自由选择如何使用这个池。例如,你可以选择在访问特定连接时使用特定的池。你可以使用集群策略 https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html 来创建各种环境。 - Elad Kalif
@Elad,您能否解释一下在这种情况下DB池实例存储在哪里?我们是否可以在某个地方创建例如cx_oracle SessionPool实例并在任务之间共享它? - alaptiko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接