AirFlow DAG被卡在运行状态中

5
我创建了一个DAG并按照每日的频率进行了调度。它每天都排队,但是任务实际上没有运行。这个问题在过去已经被提出过here,但是答案对我没有帮助,所以似乎存在另一个问题。
我的代码如下所示。我用注释替换了任务t2的SQL。当我使用“airflow test…”单独运行它们时,每个任务都成功运行。
请问应该采取什么措施使DAG运行起来呢? 谢谢!
以下是DAG代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator



default_args = {
    'owner' : 'me',
    'depends_on_past' : 'true',
    'start_date' : datetime(2018, 06, 25),
    'email' : ['myemail@moovit.com'],
    'email_on_failure':True,
    'email_on_retry':False,
    'retries' : 2,
    'retry_delay' : timedelta(minutes=5)
}


dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)



t1 = BigQueryOperator(
    task_id='bq_delete_my_agg_table',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    delete `my_project.agg.my_agg_table`
    where date = '{{ macros.ds_add(ds, -1)}}'
    ''',
    dag=dag)

t2 = BigQueryOperator(
    task_id='bq_insert_my_agg_table',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    bql='''
    #standardSQL
    Select ... the query continue here.....
    ''',    destination_dataset_table='my_project.agg.my_agg_table',
    dag=dag)


t1 >> t2
1个回答

14

通常很容易找出任务未被运行的原因。在Airflow web UI中:

  • 选择任何感兴趣的DAG
  • 现在单击任务
  • 再次单击任务实例详情
  • 第一行中有一个面板任务实例状态
  • 在旁边的框原因中是任务正在运行或被忽略的原因

通常最好检查第一个未执行的任务,因为我看到你已经设置了 depends_on_past=True ,如果在错误的场景中使用可能会导致问题。

更多信息请参见:Airflow 1.9.0排队但不启动任务


1
谢谢,tobi6!你的指示非常有帮助,并揭示了一个可能的原因。问题还没有解决,因为DAG仍然被卡住,但这也许是另一个问题。在任务实例的详细信息中,depends_on_past现在为false,但出现了一个错误消息,称“此任务的DAG的depends_on_past为true,但上一个任务实例尚未运行”。 - Saar Porat
1
我接受了答案,谢谢。是的,在我上面添加评论之前,我重新启动了调度程序和Web服务器。 - Saar Porat
奇怪。您可能需要重命名DAG,例如添加_v1:my_agg_table_v1,然后再次检查。 - tobi6
1
谢谢。重命名已经生效了,至少在初始运行中是这样的。希望它能在每日计划运行中保持正常。 - Saar Porat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接