修改Airflow DAG的执行并发性

6
我想要更改特定Airflow DAG的dag_concurrency参数。似乎在airflow.cfg中有一个全局的dag_concurrency参数,但是是否可以为不同的DAG设置不同的值?
我尝试在我的DAG代码中的SSHExecuteOperator任务中添加并发参数,但是并发值仍然显示在DAG详细信息中的标准参数(16)。
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['exceptions@airflow.com'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')

with DAG(
  'ed_data_quality_20min-v1.6.6',
  default_args=default_args,
  schedule_interval="0,20,40 * * * *",
  dagrun_timeout=timedelta(hours=24)) as dag:
  (
    dag
    >> SSHExecuteOperator(
          task_id='run_remote_ed_data_quality_20min',
          bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
          ssh_hook=sshHookEtl,
          retries=0,
          concurrency=1,
          dag=dag)
  )

Here is the DAG details

2个回答

5

我找到了解决方案。我没有在正确的地方添加并发参数。它应该作为 DAG 对象的属性直接添加,而不是在任务 SSHExecuteOperator 中添加。这是新代码:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['exceptions@airflow.com'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')

with DAG(
  'ed_data_quality_20min-v1.6.6',
  default_args=default_args,
  schedule_interval="0,20,40 * * * *",
  dagrun_timeout=timedelta(hours=24),
  concurrency=1) as dag:
  (
    dag
    >> SSHExecuteOperator(
          task_id='run_remote_ed_data_quality_20min',
          bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
          ssh_hook=sshHookEtl,
          retries=0,
          dag=dag)
  )

1
几乎不可能区分您的答案和问题示例之间的差异。请考虑仅接受更改的行并解释更改。 - dlamblin

1

好的,您可以在DAG对象上直接设置concurrencyBaseOperator对象上也有一个task_concurrency。在SSHExectorOperatorBaseOperator任务中,没有concurrency参数或字段。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接