在Airflow上高效部署DAG文件的方法

55
是否有任何最佳实践可供遵循以部署新的dag到airflow?
我在谷歌论坛上看到了几条评论,指出dag保存在GIT存储库中,并定期同步到airflow集群的本地位置。关于这种方法,我有几个问题:
  • 我们是否为不同的环境(测试,生产)维护单独的dag文件?
  • 如果新版本有错误,如何回滚ETL到旧版本?
    非常感谢任何帮助。 如果需要更多详细信息,请告诉我。
  • 3个回答

    74

    以下是我们团队的管理方法。

    首先,在命名规范方面,我们每个DAG文件名与DAG本身内容中的DAG Id匹配(包括DAG版本)。这很有用,因为最终在Airflow UI中看到的是DAG Id,所以您将确切地知道每个DAG背后使用了哪个文件。

    例如,对于像这样的DAG:

    from airflow import DAG
    from datetime import datetime, timedelta
    
    default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'start_date': datetime(2017,12,05,23,59),
      'email': ['me@mail.com'],
      'email_on_failure': True
    }
    
    dag = DAG(
      'my_nice_dag-v1.0.9', #update version whenever you change something
      default_args=default_args,
      schedule_interval="0,15,30,45 * * * *",
      dagrun_timeout=timedelta(hours=24),
      max_active_runs=1)
      [...]
    

    DAG文件的名称将为:my_nice_dag-v1.0.9.py

    • 我们所有的DAG文件都存储在Git仓库中(以及其他东西)
    • 每当一个合并请求在我们的主分支中完成时,我们的持续集成管道会开始新构建,并将我们的DAG文件打包成zip文件(我们使用Atlassian Bamboo,但也有其他解决方案,如Jenkins、Circle CI、Travis等)
    • 在Bamboo中,我们配置了一个部署脚本(shell),它会解压缩包并将DAG文件放置在Airflow服务器的/dags文件夹中。
    • 我们通常会在DEV环境中部署DAG进行测试,然后是UAT和最终的PROD。点击Bamboo UI上的按钮即可完成部署,感谢上面提到的shell脚本。

    优势:

    1. 由于您在文件名中包含了DAG版本,因此以前的DAG文件不会被覆盖在DAG文件夹中,因此您可以轻松返回它
    2. 当您的新DAG文件在Airflow中加载时,您可以通过版本号在UI中识别它。
    3. 因为你的DAG文件名=DAG ID,你甚至可以通过添加一些Airflow命令行来改进部署脚本,以便在部署后自动打开新的DAG。
    4. 因为每个DAG版本都在Git中被历史记录,所以我们可以随时返回到之前的版本。

    1
    嗨Alexis,感谢您的澄清。如果有任何特定于环境的值,例如在HttpOperator中的URL,这些值如何处理?您会为每个环境维护单独的dag文件,还是使用某种配置管理系统来处理? - Sreenath Kamath
    2
    你好@SreenathKamath,为了针对特定环境的值,我们将它们配置在相应的Airflow环境中的变量中。您可以在菜单下找到它们:Admin> Variables。在您的DAG中,您可以使用from airflow.models import Variable调用这些变量,然后使用Variable.get('my_variable_name') - Alexis.Rolland
    @SreenathKamath 如果答案满足了您的需求,请考虑将此问题标记为已解决。谢谢。 - Alexis.Rolland
    1
    @alexis 如果我只是更新具有相同名称的文件(dag),则无需打开开关。 - gorros
    1
    @alexis 我还有一个问题,你是如何测试DAG文件的呢? - gorros
    显示剩余6条评论

    1

    1
    截至目前,Airflow还没有自己的工作流版本控制功能(参见this)。但是您可以通过将DAG管理到其自己的git存储库中,并将其状态作为子模块获取到airflow存储库中来自行管理。这样,您始终拥有一个包含特定版本DAG集的单个Airflow版本。在此处了解更多here

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接