Apache Airflow任务超时

6

我有一个测试dag,其中只有一个任务。这个任务是进行简单的ETL操作,试图从MSSQL数据库中提取数据,并将它们加载到PostgreSQL数据库中。按照它的工作方式是根据日期选择并插入最近360天的数据到PostgreSQL数据库中。但是该任务在执行10天左右后会超时在select语句上。

def get_receiveCars(**kwargs):
    #get current date
    end_date = datetime.now()
    #loop for 360 days
    for x in range(360):
        startDate = today - timedelta(days=x)
        delete_dataPostgres(startDate.strftime('%Y-%m-%d'), "received sample")
        select_dataMsql(startDate)

而且选择语句是:

def select_dataMsql(startDate):
    #insert data
    endDate = str(startDate.strftime('%Y-%m-%d')) + " 23:59:59"
    ms_hook = MsSqlHook(mssql_conn_id='mssql_db')
    select_sql="""select  carColor, carBrand, fuelType, COUNT(DISTINCT RequestID ) AS received 
    FROM Requests 
    where  
    ReceivedDateTime >= %s 
    AND  ReceivedDateTime< %s 
    GROUP BY carColor, carBrand, fuelType"""
    cond = (startDate, endDate)
    results =ms_hook.get_records(select_sql, parameters=cond)
    insert_data(results, startDate)

这是我的dag

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from src.get_receiveCars import get_receiveCars
#from src.transform_data import transform_data
#from src.load_table import load_table
import requests
import json
import os


# Define the default dag arguments.
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': XXXXX,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1)
}


# Define the dag, the start date and how frequently it runs.
# I chose the dag to run everday by using 1440 minutes.
dag = DAG(
    dag_id='reveive_sample',
    default_args=default_args,
    dagrun_timeout=timedelta(minutes=200),
    schedule_interval= '@daily',
    start_date=datetime(2020, 10, 30))


# First task is to query get the weather from openweathermap.org.
mid_task = PythonOperator(
    task_id='get_receiveCars',
    provide_context=True,
    python_callable=get_receiveCars,
    dag=dag)

# Set task1 
mid_task

日志

- Start syncing user roles.
[2020-10-30 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 84214
[2020-10-30 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/receive_sample.py
Traceback (most recent call last):
  File "/root/airflow/lib/python3.6/site-packages/airflow/models/dagbag.py", line 256, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/airflow/dags/receive_sample.py", line 5, in <module>
    from src.get_receiveCars import get_receiveCars
  File "/root/airflow/dags/src/get_receiveCars.py", line 56, in <module>
    get_receiveCars()
  File "/root/airflow/dags/src/get_receiveCars.py", line 17, in get_receiveCars
    delete_data(startDate.strftime('%Y-%m-%d'), "received cars")
  File "/root/airflow/dags/src/get_receiveCars.py", line 26, in delete_data
    pg_hook.run(delete_sql, parameters=cond)
  File "/root/airflow/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py", line 172, in run
    cur.execute(s, parameters)
  File "/usr/local/lib/python3.6/encodings/utf_8.py", line 15, in decode
    def decode(input, errors='strict'):
  File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 84214
[2020-10-30 18:29:40,260] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,350] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,494] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,550] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,639] {security.py:387} INFO - Fetching a set of all per

你的DAG只在单次运行中运行了10天,是这样吗? - Philipp Johannis
不,DAG每天运行一次,但get_receiveCars具有360个循环,计算为天数... 但是当循环处于10/360天时,超时会发生。 - Ray34
日志中打印的内容有意义吗? - Philipp Johannis
我已经添加了日志。 - Ray34
3个回答

3

检查您的配置:

airflow config list|grep -i timeout

dagbag_import_timeout = 30
dag_file_processor_timeout = 50
web_server_master_timeout = 120
web_server_worker_timeout = 120
log_fetch_timeout_sec = 5
smtp_timeout = 30
operation_timeout = 1.0
task_adoption_timeout = 600

你需要更改dagbag_import_timeout设定时间,以便有足够的时间加载你的dag。

要做到这一点,请更新你的airflow.cfg文件或设置环境变量:

export AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=300

1
如果它们处于排队状态并被外部杀死,那么请添加环境变量: AIRFLOW__CELERY__OPERATION_TIMEOUT: "30"

0

您的default_args中没有指定execution_timeout - 建议从这里开始:

execution_timeout (datetime.timedelta) – 任务实例执行的最大允许时间,如果超过该时间将会失败并抛出异常。

dagrun_timeout有不同的含义:

dagrun_timeout (datetime.timedelta) – 指定DagRun在超时/失败之前应保持运行的时间,以便可以创建新的DagRuns。 超时仅对计划的DagRuns执行,并且仅在活动DagRuns的数量== max_active_runs时才执行。


它总是在相同的时间被终止吗?有可能其中一个数据库终止了查询吗? - Philipp Johannis
我认为你是正确的,因为有一个循环,数字连接变得太多了。 - Ray34

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接