我是Airflow的新手。我正在尝试使用Celery Executor设置Airflow的分布式模式,参考了这篇文章:https://stlong0521.github.io/20161023%20-%20Airflow.html
在详细介绍规范之前,我想确认一下我已经在一个独立实例上安装了PostgreSQL。
设置的规范如下:
Airflow核心/服务器计算机
- Python 3.5
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
- RabbitMQ
在airflow.cfg中进行的配置:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
测试执行情况:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
Airflow工作节点计算机
已安装以下内容:
- Python 3.5,包括:
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
在airflow.cfg中进行的配置与服务器中完全相同:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
在工作机器上运行的命令输出:
运行 airflow flower 时:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//
我正在传递dag到Airflow Core机器,并且我已经将样本数据(Excel表格),这些数据会被dag处理,复制到了同一core机器上。
我的工作记录如下:
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
现在我的问题是:
1)我应该同样将dag文件夹复制到worker计算机中吗?
2)目前,我没有将dag文件夹复制到worker计算机上,我无法看到worker进程拾取任务。
请告诉我我犯了什么错误,以及如何使worker进程拾取任务。
DockerSwarmOperator
在多个节点上扩展Airflow DAG可能会更容易。 - akkiDockerOperator
要少一些开销。我期待着尝试它! - Taylor D. Edmiston