传递命令行参数给Airflow BashOperator

11

有没有办法将命令行参数传递给Airflow BashOperator?目前,我有一个接受日期参数并执行一些特定活动(如清理早于给定日期的特定文件夹)的Python脚本。

在只有一个任务的简化代码中,我想要做的是:

from __future__ import print_function
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

default_args = {
    'owner'             : 'airflow'
    ,'depends_on_past'  : False
    ,'start_date'       : datetime(2017, 01, 18)
    ,'email'            : ['abc@xyz.com']
    ,'retries'          : 1
    ,'retry_delay'      : timedelta(minutes=5)
}

dag = DAG(
    dag_id='data_dir_cleanup'
    ,default_args=default_args
    ,schedule_interval='0 13 * * *'
    ,dagrun_timeout=timedelta(minutes=10)
    )

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
        #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line)
        #,env=env
        ,dag=dag
    )

提前感谢您的帮助,

4个回答

20

BashOperator使用Jinja2模板,这意味着您可以传递任意值。在您的情况下,它可能是这样的:

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command="python cleanup.py --date {{ params.DATE }} 2>&1 >>  /tmp/airflow/data_dir_cleanup.log"
        ,params = {'DATE' : 'this-should-be-a-date'}
        ,dag=dag
    )

另请参阅:https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja,以获取更广泛的示例。


12
我建议使用{{ params.DATE }}而不是{{ DATE }}以明确其来源。更进一步,由于它不是常量,我会使用小写的{{ params.date }} - nandoquintana
@nandoquintana编辑了代码以反映它是代码正常工作所必需的。 - Mehdi LAMRANI

5
您可以尝试以下方法(对我有用):
cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'"

t = BashOperator(
     task_id = 'some_id',
     bash_command = cmd_command,
     dag = your_dag_object_name)

当我这样做时,它渲染了变量,而且运行良好。我相信它适用于所有变量(请注意,我在命令的开头放了单词“python”,因为我想运行一个.py脚本)。
我的任务已经正确编写,以便将这些变量作为命令行参数读取(sys.argv属性)。

1

BashOperator是Jinja模板化的,因此参数可以作为字典传递。

Airflow将安排任务并不提示您进行参数设置,因此当您说“需要将特定日期作为命令行参数传递”时,这是不可能的。虽然Airflow有一个执行日期的概念,这是dag计划运行的日期,可以使用宏{{ ds }}或{{ ds_nodash }}(https://airflow.incubator.apache.org/code.html#macros)将其传递给BashOperator参数。

env = {}
env['DATE'] = '{{ ds }}'  
cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
        ,params=env
        ,dag=dag
    )

“DATE”参数将被传递给bash脚本,并可以像其他bash变量一样使用$ DATE。


我尝试了这个解决方案,但是 DS 没有被渲染出来。我找不到传递 ds 作为参数的方法!! - Omar14

-1
尝试使用 os.system("在此处输入您的命令")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接