Airflow - 在Ubuntu上自定义XCom后端

3

我正在尝试实现自定义的XCOM后端。

以下是我所做的步骤:

  1. 在主Airflow目录(AIRFLOW_HOME)下创建了“include”目录。
  2. 在其中创建了名为“custom_xcom_backend.py”的文件。
from typing import Any
from airflow.models.xcom import BaseXCom

import pandas as pd


class CustomXComBackend(BaseXCom):

    @staticmethod
    def serialize_value(value: Any):
        if isinstance(value, pd.DataFrame):
            value = value.to_json(orient='records')
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result) -> Any:
        result = BaseXCom.deserialize_value(result)
        result = df = pd.read_json(result)
        return result


在配置文件中设置:
  1. 在配置文件中进行设置:
xcom_backend = include.custom_xcom_backend.CustomXComBackend

当我重新启动Web服务器时,我遇到了以下问题:
airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "xcom_backend" key in "core" section. Current value: "include.cust...

我猜测它没有识别到“include”文件夹,但我该如何修复呢?*注意:没有使用docker,而是安装在Ubuntu机器上。谢谢!
1个回答

3

我已经解决了这个问题:

  1. 将custom_xcom_backend.py放入插件目录中
  2. 在配置文件中进行设置:
xcom_backend = custom_xcom_backend.CustomXComBackend
  1. 重启所有与airflow相关的服务

*注意:不要以这种方式存储DataFrames(不良实践)。

我使用的来源: https://www.youtube.com/watch?v=iI0ymwOij88


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接