Airflow Worker 配置

12

我是Airflow的新手。我正在尝试使用Celery Executor设置Airflow的分布式模式,参考了这篇文章:https://stlong0521.github.io/20161023%20-%20Airflow.html

在详细介绍规范之前,我想确认一下我已经在一个独立实例上安装了PostgreSQL

设置的规范如下:

Airflow核心/服务器计算机

  • Python 3.5
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
    • psycogp2
  • RabbitMQ

在airflow.cfg中进行的配置:

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

测试执行情况:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow工作节点计算机

已安装以下内容:

  • Python 3.5,包括:
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
  • psycogp2

在airflow.cfg中进行的配置与服务器中完全相同:

sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

在工作机器上运行的命令输出:

运行 airflow flower 时:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//

我正在传递dag到Airflow Core机器,并且我已经将样本数据(Excel表格),这些数据会被dag处理,复制到了同一core机器上。

我的工作记录如下: raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

现在我的问题是:

1)我应该同样将dag文件夹复制到worker计算机中吗?

2)目前,我没有将dag文件夹复制到worker计算机上,我无法看到worker进程拾取任务。

请告诉我我犯了什么错误,以及如何使worker进程拾取任务。

4个回答

5
一些使用Airflow时最大的痛点在于部署以及在Airflow调度程序、Airflow Web服务器和Celery工作节点之间保持DAG文件和插件的同步。我们创建了一个名为Astronomer Open的开源项目,它自动化了一个Docker化的Airflow、Celery和PostgreSQL,并内置了一些其他好东西。该项目的动力来自于看到很多人遇到相同的痛点,创建了非常相似的设置。例如,这是Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile,这是文档: https://open.astronomer.io/

完全透明:这是我在工作中贡献的项目 - 我们还提供一个付费的企业版,也可以在Kubernetes上运行(文档)。 话虽如此,开放版完全免费使用。


1
这似乎不再是开源的了。 - deontologician
1
@deontologician 看起来 Dockerfiles 的位置已经改变,但它们仍然是开源的。https://github.com/astronomer/astronomer/tree/master/docker/airflow。您还可以在本地使用 Astro CLI 进行简单快速的启动体验。https://www.astronomer.io/docs/cli-quickstart - Taylor D. Edmiston
如果已经选择了Docker路径,使用Airflow的新DockerSwarmOperator在多个节点上扩展Airflow DAG可能会更容易。 - akki
@TaylorEdmiston 我个人没有测试过与远程Docker守护程序的兼容性,但是根据PR上的这条评论,我理解是可行的。操作器本身使您能够在不运行Airflow的服务器上运行Airflow任务(但它是Docker集群的一部分)。 Docker Swarm是Docker的本地编排工具 - 基本上,您有一堆运行Docker的服务器(其中一个应该运行Airflow),然后此操作器会在任何具有足够资源的服务器上运行您的任务。 - akki
@akki 谢谢,这是一个非常酷的想法,将Airflow与Docker Swarm集成。由于Swarm集群对Airflow不感知,因此似乎比直接在Airflow工作进程中运行许多DockerOperator要少一些开销。我期待着尝试它! - Taylor D. Edmiston
显示剩余2条评论

4

你的配置文件看起来没问题。正如你所怀疑的那样,所有的工作进程确实都需要一个DAG文件夹的副本。你可以使用类似于git的工具来保持它们同步和更新。


1
非常感谢您的回复。是的,我尝试将DAG文件夹放置在工作实例中,但仍然无法正常工作。最终我明白了,我不仅需要复制DAG文件夹,还需要将工作实例的fernet_key更改为与调度程序相同才能使其正常工作。 - Soundar Raj
1
我建议在调度器、工作进程和Web服务器之间保持配置相同。 - Daniel Huang
你能否详细描述一下你是如何解决这个问题的?你说仅仅改变了worker的fernet_key是什么意思? - Kyle Bridenstine
我已经在AWS EC2实例上成功运行了Airflow-Docker。现在我想将其升级为Airflow-Docker-Swarm以实现可扩展性。你们能否给出一些相关指针? - Naresh Y

1
有点晚了,但它可能仍然对某些人有帮助,因为从现有的答案来看,除了“手动”部署(通过git/scp等)外,似乎没有其他分享DAGs的方法,但实际上是有的。
Airflow支持pickling(从CLI中的-p参数或您的docker-compose文件中的command: scheduler -p),这允许在服务器/主节点上部署DAG,并将其序列化并发送到worker节点(因此您不必在多个地方部署DAG,并避免了DAG不同步的问题)。
Pickling与CeleryExecutor兼容。
Pickling有一些限制,可能会导致问题,特别是类和函数的实际代码不被序列化(只有完全限定名称),因此如果您尝试反序列化引用目标环境中不存在的代码的DAG,则会出现错误。有关pickle的更多信息,请参见:https://docs.python.org/3.3/library/pickle.html

0

是的,DAG 必须存在于所有 Airflow 节点上 - 工作节点、Web 服务器、调度器。

您可以设置一个 cron,在所有节点上运行 git pull 命令,以保持 DAG 文件夹同步。

Airflow 将会把所有 DAG 移动到数据库中,而不是文件系统 - 这个功能可能会在 2.0 版本中推出。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接