Airflow中的Python脚本调度

32

大家好,

我需要使用airflow调度我的python文件(其中包含从sql中提取数据和一些联接)。我已经成功地将airflow安装到我的linux服务器上,并且airflow的webserver也可用于我。但是,即使我查看了文档,我还不清楚我需要在哪里编写脚本以进行调度,以及该脚本如何在airflow webserver上可用,以便我可以查看状态

就配置而言,我知道dag文件夹位于我的主目录中,并且也知道示例dag所在的位置。

注意:请不要将此视为与如何在Airflow中运行bash脚本文件的重复问题,因为我需要运行位于某个不同位置的python文件。

请查看Airflow webserver中的配置信息:

enter image description here

以下是AIRFLOW_HOME目录中dag文件夹的屏幕截图

enter image description here

还请查看下面关于DAG创建的屏幕截图以及"Missing DAG"错误

enter image description here

enter image description here

选择simple DAG后,会出现缺少DAG的错误

enter image description here

4个回答

41

你应该使用PythonOperator来调用你的函数。如果你想在其他地方定义这个函数,只需要从模块中导入它就可以了,只要它在你的PYTHONPATH中可访问。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

如果您的函数my_python_function在脚本文件/path/to/my/scripts/dir/my_script.py中,那么在启动Airflow之前,您可以将路径添加到 PYTHONPATH 中,如下所示:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

更多信息在这里:https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

默认参数和其他注意事项请参考教程:https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html


嗨@postrational,感谢您的建议。请指导我在airflow中写这个脚本的位置,您在评论中提到的那个?抱歉,我对airflow非常陌生。 - Abhishek Pansotra
好的,包含DAG定义的文件应该放在AIRFLOW_HOME/dags目录下。包含Python函数的文件可以放在任何目录中,只要它在PYTHONPATH上就可以了。我在我的答案中添加了一些信息。 - postrational
嗨@postrational,包含DAG定义的文件的文件扩展名是什么?在我的airflow配置中,dags_folder被指定为**/home/amit/airflow/dags**,所以我已经将DAG定义放在那里了..这样做对吗? - Abhishek Pansotra
没错。DAG定义的代码是Python,因此文件应该具有.py扩展名。 - postrational
我已经添加了相同的内容..现在如何将其导入到Airflow UI中?我已经创建了DAG运行并提供了DAG Id为“tutorial”,但没有显示DAG条目表。 - Abhishek Pansotra
显示剩余3条评论

30

你也可以使用bashoperator在Airflow中执行Python脚本。你可以将脚本放在DAG文件夹中的一个文件夹中。如果你的脚本在其他地方,只需提供这些脚本的路径。

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      }

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

嗨Navjot,感谢您的建议。请指导我在airflow中写这个脚本的位置,这是您在评论中提到的吗?抱歉,我对airflow非常陌生。 - Abhishek Pansotra
1
希望你已经弄清楚了,但是为了其他人,dags文件夹在airflow.cfg中指定的位置,在AIRFLOW_HOME目录中。 - saadi

2
Airflow会解析$AIRFLOW_HOME/dags(在您的情况下为/home/amit/airflow/dags)中的所有Python文件。那个Python脚本应该像"postrational"的答案中所示一样返回一个DAG对象。如果出现缺失的情况,这意味着Python代码存在问题,Airflow无法加载它。请检查Airflow Web服务器或调度程序日志以获取更多详细信息,因为stderr或stdout会被记录在那里。

谢谢……Python脚本出了问题,已经解决。 - Abhishek Pansotra

0
  1. 按照Airflow官方文档安装Airflow。最好在Python虚拟环境中安装。 http://python-guide-pt-br.readthedocs.io/en/latest/dev/virtualenvs/
  2. 第一次使用时,使用以下命令启动Airflow:

airflow webserver -p <port>

它会自动加载示例DAG,可以在$HOME/airflow/airflow.cfg中禁用。

`load_examples = False`
  1. 在 $HOME/airflow/ 中创建 dags 文件夹,从 https://airflow.incubator.apache.org/tutorial.html 下载 tutorial.py 文件并放入 dags 文件夹中。
  2. 进行一些实验,在 tutorial.py 文件中进行更改。如果您将 schedule_interval 设置为 cron 语法,则 'start_date' : datetime(2017, 7, 7)

    'start_date': datetime.now()
    

    dag = DAG('tutorial', default_args=default_args,schedule_interval="@once") 或者 dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *") # 每分钟调度

  3. 启动 airflow:$ airflow webserver -p <port>

  4. 启动调度器:$ airflow scheduler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接