我有几个Python文件,目前是使用BashOperator执行的。这使我可以轻松选择Python虚拟环境。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
...}
dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")
t1 = BashOperator(
task_id='task1',
bash_command='~/anaconda3/envs/myenv/bin/python
/python_files/python_task1.py',
dag=dag)
我该如何使用PythonOperator实现类似的功能?
from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1
python_task = PythonOperator(
task_id='python_task',
python_callable=python_task1.main,
dag=dag)
我假设PythonOperator将使用系统Python环境。我发现Airflow有PythonVirtualenvOperator,但这似乎是通过使用指定的要求即时创建新的虚拟环境来工作的。我更喜欢使用已经正确配置的现有虚拟环境。
我该如何使用指定的Python路径运行PythonOperator?