Airflow使用RabbitMQ和Celery时,任务无法执行

4
以下是我使用的配置:
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags

# The folder where airflow should store its log files. This location
base_log_folder = /root/airflow/logs

# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/centos/airflow/airflow.db
sql_alchemy_conn = mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above


# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = pyamqp://guest:guest@XXX.XXX.XXX.XXX:5672/


# Another key Celery setting
celery_result_backend = db+mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5556

# Default queue that tasks get assigned to and that worker listen on.
default_queue = = default

但是作业并没有运行……调度程序显示正在检查状态,如下所示。
[2017-05-11 05:09:13,070] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-13 00:00:00: scheduled__2015-06-13T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,072] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-14 00:00:00: scheduled__2015-06-14T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,074] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-15 00:00:00: scheduled__2015-06-15T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,076] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-16 00:00:00: scheduled__2015-06-16T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,078] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 04:46:29: manual__2017-05-10T04:46:28.756946, externally triggered: True>
[2017-05-11 05:09:13,080] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 05:08:20: manual__2017-05-10T05:08:20.252573, externally triggered: True>

Airflow UI已经启动。Celery flower没有显示任何工作程序。我的任务没有运行。

以下是我开始使用的步骤。

Airflow调度器

airflow webserver -p 8080

Airflow工作程序

我有什么遗漏吗?


当您启动工作程序时,我建议检查工作程序的控制台。有什么信息吗? - jhnclvr
我在Flower UI上也没有看到任何工作进程。不确定是否在配置中漏掉了什么。另外,“Airflow worker”是否足以启动Celery工作进程? - Chetan J
"airflow worker"就可以了。在执行时,请检查控制台输出。我认为那里可能会有更多关于正在发生的事情的信息。 - jhnclvr
1个回答

1

如果您不知道Airflow的版本以及如何配置rabbitmq-server,那么很难确定如何回答您的问题。但是,我可以提供一些需要注意的事项。

这里是Celery指定代理URL的文档。在您的airflow.cfg中,代理URL没有指定虚拟主机,因此根据文档,将使用默认虚拟主机。我查了一些资料,但找不到pyampq的默认虚拟主机,但这值得关注。

或者,您也可以使用rabbitmqctl显式地配置虚拟主机。 这里有一个帖子,有人介绍了如何在Airflow中执行此操作。我已经复制并粘贴了相关信息如下:

# Configure RabbitMQ: create user and grant privileges
rabbitmqctl add_user rabbitmq_user_name rabbitmq_password
rabbitmqctl add_vhost rabbitmq_virtual_host_name
rabbitmqctl set_user_tags rabbitmq_user_name rabbitmq_tag_name
rabbitmqctl set_permissions -p rabbitmq_virtual_host_name rabbitmq_user_name ".*" ".*" ".*"

最后,您可能会遇到与您正在使用的Celery版本有关的问题。发布时,Celery 4.X.X与Airflow不兼容。尝试卸载Celery并重新安装一个可用的版本。

pip uninstall celery
pip install celery==3.1.7

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接