函数中的Airflow DAG?

6

我正在 $AIRFLOW_HOME/dags 中工作。我创建了以下文件:

- common
  |- __init__.py   # empty
  |- common.py     # common code
- foo_v1.py        # dag instanciation

common.py文件中:
default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

  t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

  t2.set_upstream(t1)

foo_v1.py 中:
 from common.common import create_dag

 create_dag('foo', 'v1')

在使用Python测试脚本时,看起来可以正常工作:

 $ python foo_v1.py
 [2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
 creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0

然后我在本地启动Web服务器和调度程序。 我的问题是,我没有看到任何ID为foo_v1的DAG。没有创建任何pyc文件。做错了什么? 为什么foo_v1.py中的代码没有被执行?


你尝试将输出分配给变量了吗?x = create_dag('foo', 'v1') - nimish
3个回答

8
为了让Airflow找到,create_dag()返回的DAG对象必须位于foo_v1.py模块的全局命名空间中。将DAG放置在全局命名空间的一种方法是将其赋值给模块级变量。
from common.common import create_dag

dag = create_dag('foo', 'v1')

另一种方法是使用globals()来更新全局命名空间:
globals()['foo_v1'] = create_dag('foo', 'v1')

后面的内容翻译如下:

后者可能看起来有些过头,但它对于动态创建多个DAG非常有用。例如,在for循环中:

for i in range(10):
    globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')

注意:任何放置在$AIRFLOW_HOME/dags目录下的*.py文件(即使在子目录中,例如您的情况下的common)都将被Airflow解析。如果您不想这样做,可以使用.airflowignore打包的DAGs


1
正如这里所提到的,你必须在创建后返回dag!
default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

  t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

  t2.set_upstream(t1)

  return dag # Add this line to your code!

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接