Kubernetes执行器在Airflow中无法并行执行子DAG。

9

由于执行的一些限制,我们在Airflow 1.10.0中放弃了Celery Executor,并使用KubernetesExecutor。

现在,即使我们直接在代码中更改subdag_operator,有些DAG中的所有任务仍无法并行化:

https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38

我们的期望是通过这些修改和使用Kubernetes Executors,我们可以同时扇出所有任务的执行,但我们具有相同的SequentialExecutor行为。

这是我们现在拥有的行为:

enter image description here

我们希望使用KubernetesExecutor同时执行所有任务。


1
当在DAG中并行执行任务时,k8s执行器在我的Airflow中运作良好。我建议您使用最新的Airflow版本重试,因为k8s执行器是相当新的。 - shawnzhu
2
你好@shawmzhu,他们在几个分支之前修复了它,这个问题仍然存在于先前的版本(2018年11月)。但还是谢谢。 - Flavio
你是否将子类更改为使用KubernetesExecutor作为默认执行器,而不是SequentialExecutor? - bamdan
1个回答

1
Kubernetes Executor在Airflow中将所有一级任务转换为带有本地执行器的工作人员Pod。这意味着您将获得本地执行器来执行您的SubDagOperator。
为了在生成工作人员Pod后运行SubDagOperator下的任务,您需要为工作人员Pod指定配置parallelism。因此,如果您正在使用YAML格式的工作人员Pod,则需要将其编辑为以下内容。
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - args: []
      command: []
      env:
        ###################################
        # This is the part you need to add
        ###################################
        - name: AIRFLOW__CORE__PARALLELISM
          value: 10
        ###################################
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      envFrom: []
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      ports: []
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: false
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
          subPath: repo/tests/dags
  hostNetwork: false
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
  nodeSelector:
    {}
  affinity:
    {}
  tolerations:
    []
  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-local-settings

然后,SubDagOperator会遵循指定的parallelism并行运行任务。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接