无法在Airflow中部署DAG

3

我无法在Airflow中部署DAG,一直出现相同的错误。正在运行8080端口,调度器也在运行。

Running %s on host %s <TaskInstance: bworkflow_dag.bworkflow_template 2020-08-11T00:00:00+00:00 [failed]> airflow-instance-test.c.cc-data-sandbox.internal
[2020-08-11 14:46:29,030] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-08-11 14:46:29,031] {dagbag.py:396} INFO - Filling up the DagBag from /home/kshitij/airflow/dags
/home/kshitij/.local/lib/python3.5/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
  category=PendingDeprecationWarning)
Running %s on host %s <TaskInstance: bworkflow_dag.bworkflow_template 2020-08-11T00:00:00+00:00 [failed]> airflow-instance-test.c.cc-data-sandbox.internal

我正在使用正确的路径,即~/airflow/dags

以下是代码片段:

from builtins import range
from datetime import timedelta
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.dataproc_operator import DataprocWorkflowTemplateInstantiateOperator

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='workflow_dag',
    default_args=args,
    schedule_interval=None,
    dagrun_timeout=timedelta(days=1),
)

workflow_template = DataprocWorkflowTemplateInstantiateOperator(
    template_id="workflow_rds",
    project_id="<project name>",
    task_id="workflow_template",
    dag=dag)


workflow_template


if __name__ == "__main__":
    dag.cli()

这是一个单任务DAG。

请告诉我我的错误在哪里。


那最后三行(从workflow_template任务变量空白引用到id __name__ == "__main__"dag.cli())是不必要的,请将它们删除[特别是dag.cli():我不确定它的作用,但这可能会导致问题]。 - y2k-shubham
你正在使用Google Cloud Composer吗?如果是的话,我建议标记它们并删除Cloud Dataproc。 - Gaurangi Saxena
@GaurangiSaxena 我已经在GCP上托管了一个虚拟机,并在端口8080上使用airflow。 - Kshitij Bhadage
@y2k-shubham 成功了。 注释掉这个 __name__ == "__main__" and dag.cli() - Kshitij Bhadage
@y2k-shubham你能把它放在回答里,这样我就可以标记为已完成吗? - Kshitij Bhadage
1个回答

3

如评论中所讨论的,以下行不必要且需要被删除。

...
workflow_template


if __name__ == "__main__":
    dag.cli()

逐个进行

  • workflow_template:这什么也不做,只是引用一个保存task的变量(删除它只是清理工作)
  • if __name__ == "__main__"::意在仅在文件作为主程序执行时执行某些代码。在DAG文件中不需要
  • dag.cli():我认为这是罪魁祸首。我以前没有使用过这种方法,但docstring说它会公开特定于此DAG的CLI,这会为解析DAG定义文件(实际上是webserver)创建问题

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接