我创建了一个DAG并按照每日的频率进行了调度。它每天都排队,但是任务实际上没有运行。这个问题在过去已经被提出过here,但是答案对我没有帮助,所以似乎存在另一个问题。
我的代码如下所示。我用注释替换了任务t2的SQL。当我使用“airflow test…”单独运行它们时,每个任务都成功运行。
请问应该采取什么措施使DAG运行起来呢? 谢谢!
以下是DAG代码:
我的代码如下所示。我用注释替换了任务t2的SQL。当我使用“airflow test…”单独运行它们时,每个任务都成功运行。
请问应该采取什么措施使DAG运行起来呢? 谢谢!
以下是DAG代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}
dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)
t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}'
''',
dag=dag)
t2 = BigQueryOperator(
task_id='bq_insert_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
bql='''
#standardSQL
Select ... the query continue here.....
''', destination_dataset_table='my_project.agg.my_agg_table',
dag=dag)
t1 >> t2