Airflow - 任务间数据库连接的复用

3

我正在尝试使用Airflow将数据导入Redshift。我的源数据库中有许多表格,我编写了Python模块,可实现端到端的ETL进入Redshift。我打算使用Airflow协调它们的执行,其中每个表格的ETL将是DAG中的任务。重要的是,我希望避免为每个表格建立Redshift连接。相反,我想在开始时建立一次连接,并希望重复使用该连接来处理每个下游任务。

我尝试使用xcom,但似乎连接对象不可pickle-able,因此无法使用(而且pickle化“connection”的概念并不太合理)。我还尝试了以下方法使用全局变量,但似乎并不起作用,因为它显然为每个任务实例化一个新对象:

redcon = None
red_iam_role = None

dag = DAG('redshift_etl', description='An example redshift etl prototype', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)

def connect_to_redshift():
    global redcon, red_iam_role
    if redcon is None or red_iam_role is None:
        (redcon, red_iam_role) = redshift_utilities.get_connection('redshift_cluster', 15) #a module that returns a tuple consisting of redshift connection object (using psycopg2) and redshift_iam_role which I use for a few tasks.
        return {"connection": redcon, "iam_role": red_iam_role}
    else:
        return {"connection": redcon, "iam_role": red_iam_role}

def first_task(redshift_params,**context):
    print(redshift_params["connection"])


def second_task(redshift_params,**context):
    print(redshift_params["connection"])


first_task_dag = PythonOperator(task_id='first_task',provide_context=True,op_kwargs={"redshift_params":connect_to_redshift()}, python_callable=first_task, dag=dag)
second_task_dag = PythonOperator(task_id='second_task',provide_context=True,op_kwargs={"redshift_params":connect_to_redshift()}, python_callable=second_task, dag=dag)

first_task_dag >> second_task_dag


有没有办法实现这个?我不想在Airflow中设置连接(例如JdbcHook),并在此上下文中使用它。


你为什么关心重复使用同一个连接?你想在不同的任务中使用临时表吗? - undefined
由于像Redshift这样的MPP系统通常对同时连接数有限制,并且每个连接都涉及一定的成本(例如会话建立、资源分配、消息交换),因此通常认为最好只建立一次与该系统的连接,并在该连接上重用所有操作。 - undefined
你可以使用连接池(一个Airflow的特性)来限制并发,并确保在操作器设计中关闭连接。除此之外,我认为你只需要让你的任务“更大”,在同一个连接上执行更多的操作,如果这对你很重要的话。 - undefined
1个回答

1
如果您所说的“连接”是指数据库连接,那么不行。每个Airflow任务实例在自己的进程中执行,因此您将无法重用相同的连接。
如果您想要为多个操作重用同一连接,您需要将它们合并为一个任务(例如,在execute中,循环遍历每个表并执行您的工作)。
没有必要使用jdbc。您应该能够使用PostgresHook来连接Redshift。
如果您指的是空气流连(即 Connection模型,它是空气流管理凭证的方式),那么是的。根据管理连接中所述一次定义您的连接,并在每个任务定义中引用它。

通过空气流连接选项,它是每个DAG建立一次连接还是每个任务建立一次连接? - undefined
但是,DAG(有向无环图)并不执行任何工作——只有任务才能执行诸如打开数据库连接之类的操作。不同的任务将打开不同的连接。它们可能在不同的机器上运行,肯定在不同的进程中运行。Airflow的Connection对象仅仅是一种检索凭据的方式。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接