为什么Airflow中的任务会卡住不执行?

12
我正在使用BigQueryOperator尝试Airflow。我想以后会使用Google Composer,但是我希望先在本地运行它。我已经成功地运行了Airflow和BashOperator,我还可以运行airflow test <dag> <task>,其中task是我想要运行的大查询任务,但是当我从UI触发DAG时,bigquery任务从未排队。相反,它们处于REMOVED状态,什么也不会发生。
我的DAG定义如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

yesterday = datetime.combine(
    datetime.today() - timedelta(1),
datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        'tutorial', default_args=default_args) as dag:

operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date')

    template_sql = '''
            SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
        '''

    sampleBigQuery1 = BigQueryOperator(
        task_id='bq_simple_sql1',
        bql=template_sql,
        use_legacy_sql=False,
        destination_dataset_table='temp_tomas.airflow_1',
        allow_large_results=True,
        params={'my_value': (datetime.now()).strftime("%D %T"),
                'my_value2': "yolo"},  # getTables()},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE'
    )

    t1 >> sampleBigQuery1

那么,当使用airflow test ...运行时正常,但是通过调度程序或UI触发时不正常的情况下,我该如何调试?这里似乎有什么问题吗?

在本地,我使用标准的airflow安装和sqllite,但我认为这不应该有任何影响。我将所有内容都运行在一个Python环境中,因此应该相当独立。

1个回答

8
如果这是您第一次设置Airflow,您可能想首先检查以下内容:Airflow 1.9.0排队但未启动任务。此外,我尤其推荐最后一个步骤:如果其他方法均不起作用,您可以使用Web UI单击DAG,然后单击Graph View。现在选择第一个任务并单击Task Instance。在段落Task Instance Details中,您将看到为什么DAG正在等待或未运行的原因。这可能会让您更加了解为什么任务未被调度。

我已经检查了实例的详细信息,状态为:任务处于“已删除”状态,这不是执行的有效状态。必须清除任务才能运行。但我不知道为什么它处于“已删除”状态。正如我上面所说,如果我尝试在特定任务上运行airflow test,它可以工作。此外,总体而言,其他所有内容都可以正常运行。我可以运行由多个BashOperator组成的DAG,没有任何问题。 - Tomas Jansson
@TomasJansson,你也尝试过使用airflow clear清理该任务了吗? - tobi6
这很奇怪,因为 REMOVED 状态通常是用于已经存在但已从 DAG 中删除的任务。 - tobi6
4
我重命名了文件和DAG的名称,将文件再次复制到dags文件夹中,现在它似乎可以正常工作了。 - Tomas Jansson
1
感谢,@TomasJansson。你应该把你的评论作为问题的答案发布。对我来说,重命名DAG解决了它,至少暂时解决了。 - Jack Davidson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接