我正在尝试使用Airflow来执行一个简单的Python任务。
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
如果我试一下,例如:
airflow test python_test print 2015-01-01
它有效!
现在我想把我的 def print_context(ds, **kwargs)
函数放到另一个 Python 文件中。所以我创建了另一个名为 simple_test.py 的文件并进行如下更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
现在我再试一次运行:
airflow test python_test print 2015-01-01
没问题!它仍然能工作!
但是如果我创建一个模块,例如名为SimplePython.py的worker模块,导入它(from worker import SimplePython
),并尝试:
airflow test python_test print 2015-01-01
它会显示以下信息:
ImportError:找不到名为worker的模块
问题:
- 在DAG定义中导入模块是否可行?
- Airflow + Celery将如何在工作节点之间分发所有必要的Python源文件?
Note: I have kept the HTML tags and made the content more understandable.