我正在 $AIRFLOW_HOME/dags
中工作。我创建了以下文件:
- common
|- __init__.py # empty
|- common.py # common code
- foo_v1.py # dag instanciation
在
common.py
文件中:default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
在
foo_v1.py
中: from common.common import create_dag
create_dag('foo', 'v1')
在使用Python测试脚本时,看起来可以正常工作:
$ python foo_v1.py
[2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0
然后我在本地启动Web服务器和调度程序。 我的问题是,我没有看到任何ID为foo_v1
的DAG。没有创建任何pyc
文件。做错了什么? 为什么foo_v1.py
中的代码没有被执行?
x = create_dag('foo', 'v1')
- nimish