如何在Apache Airflow中并行运行任务

4
我已经在Airflow中运行了以下dag: enter image description here 当执行以上dag时,它将按照以下顺序串行运行:
A -> B -> C1 -> C2 -> D1 -> D2
A -> B -> C2 -> C1 -> D2 -> D1
但我的要求是同时运行C1和C2任务。这是我airflow.cfg的一部分。

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
#executor = LocalExecutor

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1

# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30

# Secret key used to run your flask app
secret_key = temporary_key

# Number of workers to run the Gunicorn web server
workers = 4

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. However airflow will never
# use more threads than the amount of cpu cores available.
max_threads = 2


只有当你使用了 SequentialExecutor 才会发生这种情况,但这明显不是你在 airflow.cfg 中设置的选项。你可以分享一下 echo $AIRFLOW_HOME 命令的输出吗? - Tameem
@Tameem,/opt/airflow - Hasitha
更改配置后,您是否重新启动了Web服务器和调度程序? - Tameem
@Tameem,是的,几次。 - Hasitha
你运行了哪些命令来重启Web服务器、调度程序和工作进程? - Tameem
3个回答

2
在您的dag属性中添加concurrency=x(其中x是大于1的整数)。
max_active_runs是dag并发,concurrency是任务并发。
示例:
    dag = DAG(
    dag_id,
    default_args=default_args,
    schedule_interval='00 03 * * *',
    max_active_runs=2,
    concurrency=2)

1
如果您只是在单台机器上进行测试,我建议使用LocalExecutorSequentialExecutor按顺序运行任务,而CeleryExecutor需要具有消息代理的机器集群。
此外,当您使用LocalExecutor时,您应该使用与sqlite不同的元数据库,因为sqlite不支持并行读取。因此,您可以使用PostgresMySQL,并相应地更改airflow.cfg文件中的sql_alchemy_conn
阅读此内容:https://airflow.apache.org/howto/initialize-database.html

“LocalExecutor”是一种可以在本地并行化任务实例的执行程序。


0

这似乎是一个配置问题。从您的配置中我看到executor是CeleryExecutor。请检查数据库和消息代理组件。

如果这些组件没有配置为并行运行,则您的任务也无法并行运行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接