如何将PostgreSQL查询结果传递到Airflow中的变量中?(使用Postgres Operator或Postgres Hook)

3

我打算使用PostgreSQL作为我的任务元信息提供者,因此我想运行一些查询并获取一些数据,然后将其作为填充变量传递到另一个任务。

问题是当我使用PostgresHook时,我可以获得数据,但它在一个Python方法中,我无法访问,事实上我看到下面这行代码:

[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]

这是我的部分代码:

def _query_postgres(**context):
    """
    Queries Postgres and returns a cursor to the results.
    """

    postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
    conn = postgres.get_conn()
    cursor = conn.cursor()
    mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")

    # iterate over to get a list of dicts
    details_dicts = [doc for doc in cursor]

    # serialize to json string
    details_json_string = json.dumps(details_dicts, default=json_util.default)

    task_instance = context['task_instance']
    task_instance.xcom_push(key="my_value", value=details_json_string)
    return details_json_string

但是我不知道应该使用哪个变量来访问它,也不知道如何将其推送到XCOM中,以便我可以将返回的值作为参数传递给另一个BashOperator任务(例如Spark)。

另一方面,PostgresOperator仅返回None作为结果。


你正在使用哪个版本的Airflow? - Josh
@jdimella 最新版本,2.x。 - Aramis NSR
2个回答

3
XComs 的原理是你可以在一个任务中 push 它们,在另一个任务中 pull 它们。如果你想在 bash operator 中使用在 _query_postgres 函数中推送的 XCom,可以使用类似于以下的方法:
puller = BashOperator(
        task_id="do_something_postgres_result",
        bash_command="some-bash-command {{ task_instance.xcom_pull(key='my_value', task_ids='query_postgres_task_id_here') }}",
        dag=dag)

您需要用适当的命令替换bash_command,并将xcom_pull()中的task_ids更改为调用_query_postgres函数的任务的task_id

关于PostgresOperator,即使您运行SELECT查询,返回None也是可以的。 您使用PostgresHook的方式是可以的。

了解XComs的一些好资源:

  1. https://medium.com/analytics-vidhya/airflow-tricks-xcom-and-subdag-361ff5cd46ff
  2. https://precocityllc.com/blog/airflow-and-xcom-inter-task-communication-use-cases/
  3. https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py

1
我在bash环境中访问它并没有问题,你的代码“类似这样”的确是答案。已投票,谢谢。 - Aramis NSR
好的。您也可以将问题标记为已回答,以帮助其他人 :) - bruno-uy
当然,我会等待一到两周并勾选您的答案。 :) - Aramis NSR
如果您认为这是一个好问题并有助于社区,您也可以点赞该问题。 - Aramis NSR

2
PostgresOperator没有返回任何值,所以不幸的是您无法使用它来传递数据。您将不得不实现自己的operator,对此您确实可以使用PostgresHook。
有几件事需要注意您的代码:
  1. “Returned value was”日志是从PythonOperator输出的吗?
  2. 您可以使用xcom_push()“显式地”推送到XCom,但是返回值也会自动推送到XCom,因此您的输出将存储在XCom中两次。
  3. 您可以使用xcom_pull()“拉取”XCom值,更多细节请参见:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
  4. 您可以使用cursor.fetchall()从Postgres游标中获取所有输出,我这里有一个类似的示例(它将输出写入本地磁盘):https://github.com/godatadriven/airflow-testing-examples/blob/master/src/testing_examples/operators/postgres_to_local_operator.py#L35-L41
  5. 小心大数据和XCom。默认情况下,XCom存储在Airflow元数据库中,您不希望在其中存储太大的数据。或者,您可以配置自定义XCom后端,以便您可以使用例如AWS S3进行XCom存储:https://www.astronomer.io/guides/custom-xcom-backends

谢谢您的回复,您尝试过在其他数据库(如Redis)中使用吗?我正在考虑在其他数据库中这样做。 - Aramis NSR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接