Airflow DAG执行成功但任务未运行。

7

我在airflow中有一个DAG,其中有一个任务(Python操作器),我强制在GUI中运行该任务,并且它获得了成功状态。然而,该任务未被执行,因此DAG什么也没有做。以下是dag代码:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import MySqlHook
import pandas as pd
import datetime as dt
import json
from datetime import timedelta

default_args = {
        'owner': 'airflow',
        'start_date': dt.datetime(2019,8,29,18,0,0),
        'concurrency':1,
        'retries':3
        }

def extraction_from_raw_data(conn_id):
    mysqlserver = MySqlHook(conn_id)
    query = """select * from antifraud.email_fraud_risk
            WHERE ts >= DATE_ADD(CURDATE(), INTERVAL -3 DAY)"""
    raw_data = mysqlserver.get_records(query)
    raw_data = pd.DataFrame(raw_data)

    data_as_list = []

    for i in range(len(raw_data)):
        dict1 = {}
        dict1.update(json.loads(raw_data.at[i,'raw_content']))
        data_as_list.append(dict1)

    json_data_df = pd.DataFrame(data_as_list)

    final_data = pd.concat([raw_data['email_id'],json_data_df],axis=1)

    return final_data

with DAG('emailage_data',
         default_args=default_args,
         schedule_interval = timedelta(days=1)
         ) as dag:
    extraction_from_raw_data = PythonOperator(
    task_id = 'extraction_from_raw_data',
    op_args = {'conn_id':'services'},
    python_callable = extraction_from_raw_data)

extraction_from_raw_data

所有的Worker、Scheduler和Web服务器都运行正常,因为我成功地运行了一个hello_world DAG(以及它所包含的任务)。


尝试将您的开始日期提前一天。 - absolutelydevastated
那正是问题所在。非常感谢。您可以发布一个答案,这样我就可以接受它了。 - Javier Lopez Tomas
1个回答

7
一般来说,您应该始终在DAG的开始时间和当前时间之间留出额外的间隔。
Airflow文档指出:
请注意,如果您以一天的schedule_interval运行DAG,则标记为2016-01-01的运行将在2016-01-01T23:59后不久触发。换句话说,作业实例是在其覆盖的周期结束后启动的。
让我们重复一遍:调度程序会在开始日期之后的一个schedule_interval后运行作业,在期间结束时运行。

https://airflow.apache.org/scheduler.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接