我无法在Airflow中部署DAG,一直出现相同的错误。正在运行8080端口,调度器也在运行。
Running %s on host %s <TaskInstance: bworkflow_dag.bworkflow_template 2020-08-11T00:00:00+00:00 [failed]> airflow-instance-test.c.cc-data-sandbox.internal
[2020-08-11 14:46:29,030] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-08-11 14:46:29,031] {dagbag.py:396} INFO - Filling up the DagBag from /home/kshitij/airflow/dags
/home/kshitij/.local/lib/python3.5/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
category=PendingDeprecationWarning)
Running %s on host %s <TaskInstance: bworkflow_dag.bworkflow_template 2020-08-11T00:00:00+00:00 [failed]> airflow-instance-test.c.cc-data-sandbox.internal
我正在使用正确的路径,即~/airflow/dags
以下是代码片段:
from builtins import range
from datetime import timedelta
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.dataproc_operator import DataprocWorkflowTemplateInstantiateOperator
args = {
'owner': 'Airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='workflow_dag',
default_args=args,
schedule_interval=None,
dagrun_timeout=timedelta(days=1),
)
workflow_template = DataprocWorkflowTemplateInstantiateOperator(
template_id="workflow_rds",
project_id="<project name>",
task_id="workflow_template",
dag=dag)
workflow_template
if __name__ == "__main__":
dag.cli()
这是一个单任务DAG。
请告诉我我的错误在哪里。
workflow_template
任务变量空白引用到id __name__ == "__main__"
和dag.cli()
)是不必要的,请将它们删除[特别是dag.cli()
:我不确定它的作用,但这可能会导致问题]。 - y2k-shubham__name__ == "__main__" and dag.cli()
- Kshitij Bhadage