SimpleHttpOperator Airflow,数据模板化

3

我正在尝试在通过dag_run发送的配置中,正确呈现SimpleHttpOperator中的data

result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

问题在于渲染的数据看起来像这样。
{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}

我不确定在这里做错了什么。

编辑 下面是完整代码:

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(0),
}


def print_result(**kwargs):
    ti = kwargs['ti']
    pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
    pprint.pprint(pulled_value_1)


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:

    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

    pull = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
    )
    result >> pull

请您能否详细描述一下流程?您在哪里看到了不正确的渲染?您如何执行这个运算符? - Elad Kalif
是的,我已经放置了所有的代码。我的问题是,我正在尝试收集我运行dag时发送的参数(通过{{dag_run}}模板),并使用完全相同的参数通过SimpleHTTPOperator发送HTTP请求。 - Tizianoreica
我不明白的是如何让渲染在应用json.dumps之前进行,因为我认为问题就出在那里。 - Tizianoreica
2个回答

2

我因为同样的错误而苦恼了很久。因此,我创建了自己的Operator(称为ExtendedHttpOperator),它是PythonOperator和SimpleHttpOperator的组合。这对我有用 :)

这个operator接收一个函数,在这个函数中我们可以收集从API传递过来的数据(使用dag_run.conf),并在将其传递给API之前解析它(如果需要)。

from plugins.operators.extended_http_operator import ExtendedHttpOperator

testing_extend = ExtendedHttpOperator(
        task_id="process_user_ids",
        http_conn_id="user_api",
        endpoint="/kafka",
        headers={"Content-Type": "application/json"},
        data_fn=passing_data,
        op_kwargs={"api": "kafka"},
        method="POST",
        log_response=True,
        response_check=lambda response: True
        if validate_response(response) is True
        else False,
    )

def passing_data(**context):
    api = context["api"]
    dag_run_conf = context["dag_run"].conf
    return json.dumps(dag_run_conf[api])

def validate_response(res):
    if res.status_code == 200:
        return True
    else:
        return False

以下是将ExtendedHttpOperator添加到Airflow的方法:

将extended_http_operator.py文件放置在your_airflow_project/plugins/operators文件夹内。

# extended_http_operator.py file

from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing import Optional, Dict

"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner 
code rather than writing one long template line to formulate the json data.
"""


class ExtendedHttpOperator(SimpleHttpOperator):
    @apply_defaults
    def __init__(
        self,
        data_fn,
        log_response: bool = False,
        op_kwargs: Optional[Dict] = None,
        *args,
        **kwargs
    ):
        super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
        if not callable(data_fn):
            raise AirflowException("`data_fn` param must be callable")
        self.data_fn = data_fn
        self.context = None
        self.op_kwargs = op_kwargs or {}
        self.log_response = log_response

    def execute(self, context):
        context.update(self.op_kwargs)
        self.context = context
        http = HttpHook(self.method, http_conn_id=self.http_conn_id)

        data_result = self.execute_callable(context)

        self.log.info("Calling HTTP method")
        self.log.info("Post Data: {}".format(data_result))
        response = http.run(
            self.endpoint, data_result, self.headers, self.extra_options
        )
        if self.log_response:
            self.log.info(response.text)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Invalid parameters")

    def execute_callable(self, context):
        return self.data_fn(**context)

不要忘记在pluginsplugins/operators文件夹中创建空的__init__.py文件。

0

我找不到解决方案。 唯一的方法是添加一个新的PythonOperator来收集信息,并在我的SimpleHTTPOperator上使用XCom来传递我通过--conf发送的信息。

代码

def generate_data(**kwargs):
    confs = kwargs['dag_run'].conf
    logging.info(confs)
    return {'url': confs["url"], 'fileType': confs["fileType"]}


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:
    generate_dict = PythonOperator(
        task_id='generate_dict',
        python_callable=generate_data,
        provide_context=True
    )
    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data="{{ task_instance.xcom_pull(task_ids='generate_dict') |tojson}}",
        log_response=True,
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接