我正在使用
我的DAG定义如下:
BigQueryOperator
尝试Airflow。我想以后会使用Google Composer,但是我希望先在本地运行它。我已经成功地运行了Airflow和BashOperator
,我还可以运行airflow test <dag> <task>
,其中task
是我想要运行的大查询任务,但是当我从UI触发DAG时,bigquery任务从未排队。相反,它们处于REMOVED
状态,什么也不会发生。我的DAG定义如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
yesterday = datetime.combine(
datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'tutorial', default_args=default_args) as dag:
operators
t1 = BashOperator(
task_id='print_date',
bash_command='date')
template_sql = '''
SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
'''
sampleBigQuery1 = BigQueryOperator(
task_id='bq_simple_sql1',
bql=template_sql,
use_legacy_sql=False,
destination_dataset_table='temp_tomas.airflow_1',
allow_large_results=True,
params={'my_value': (datetime.now()).strftime("%D %T"),
'my_value2': "yolo"}, # getTables()},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE'
)
t1 >> sampleBigQuery1
那么,当使用airflow test ...
运行时正常,但是通过调度程序或UI触发时不正常的情况下,我该如何调试?这里似乎有什么问题吗?
在本地,我使用标准的airflow安装和sqllite,但我认为这不应该有任何影响。我将所有内容都运行在一个Python环境中,因此应该相当独立。
任务处于“已删除”状态,这不是执行的有效状态。必须清除任务才能运行。
但我不知道为什么它处于“已删除”状态。正如我上面所说,如果我尝试在特定任务上运行airflow test
,它可以工作。此外,总体而言,其他所有内容都可以正常运行。我可以运行由多个BashOperator
组成的DAG,没有任何问题。 - Tomas Janssonairflow clear
清理该任务了吗? - tobi6