如何在当地时间午夜而非协调世界时午夜触发每日DAG运行

12

我位于UTC+4时区,因此当Airflow触发每晚的ETL时,这里已经是凌晨4:00了。我该如何告诉Airflow在当天的ds上运行前一天的20:00运行,但ds=ds呢?

根据文档强烈建议将所有服务器保持在UTC上,因此我正在寻找应用层面的解决方案。

编辑:一个笨拙的解决方案是将其定义为每天在晚上8点运行,所以是“前一天”,但然后在作业中使用tomorrow_ds而不是ds。但这仍然在Airflow UI上看起来很奇怪,因为它将显示UTC执行时间。

3个回答

14

调度间隔也可以是“cron表达式”,这意味着您可以轻松地在20:00 UTC运行它。再加上“user_defined_filters”的话,这意味着您可以通过一些技巧获得所需的行为:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

import pytz
tz = pytz.timezone('Asia/Dubai')


def localize_utc_tz(d):
    return tz.fromutc(d)

default_args = {
    'start_date': datetime(2017, 11, 8),
}
dag = DAG(
    'plus_4_utc',
    default_args=default_args,
    schedule_interval='0 20 * * *',
    user_defined_filters={
        'localtz': localize_utc_tz,
    },
)
task = BashOperator(
        task_id='task_for_testing_file_log_handler',
        dag=dag,
        bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)

这将输出:

UTC 2017-11-08T20:00:00, 本地时间 2017-11-09 00:00:00+04:00, 下一个时间点为 2017-11-10 00:00:00+04:00

您需要注意使用的“变量类型”。例如,dsts是字符串而不是日期时间对象,这意味着过滤器无法对它们起作用。


这对于Airflow UI中显示的时间没有太大帮助,遗憾地说 :( - Ash Berlin-Taylor
Airflow团队的核心开发人员已经开始着手修复这个问题,这意味着您将能够执行start_date = datetime(2017, 1, 1, tzinfo=“Europe/Amsterdam”)。请参见https://github.com/apache/incubator-airflow/pull/2781以跟踪此问题。它可能会在Airflow 1.10中得到解决。 - Ash Berlin-Taylor
1
Airflow 1.10已经发布,但实际上它并没有帮助,因为模板中的execution_date仍然保持在UTC时间,除非你自己进行转换... https://airflow.apache.org/timezone.html#templates 这是你在回答中所做的。真的不知道为什么他们引入了时区感知DAG,但只适用于触发DAG,其余部分仍然像以前一样工作。 - Fan
是的,UI 显示不正确(因为 Airflow 的核心操作在 UTC 中运行)https://issues.apache.org/jira/browse/AIRFLOW-2805 是更新 UI 中显示时区的票据。 - Ash Berlin-Taylor
这对我有用: tz = pendulum.timezone("Asia/Tehran") def local_ds(d): return tz.fromutc(d).date() - ArefehTam

2

我也遇到了同样的问题。我有每天、每小时和每半小时的任务。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Calcutta")

args = {
    'owner': 'ganesh',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
    'email': ['abcd@test.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test1',
    default_args=args,
    schedule_interval='30 00 * * *'
    )

first_date = BashOperator(
    task_id='first_date'
    ,
    bash_command='date'
    , dag=dag, env=None, output_encoding='utf-8')

second_date = BashOperator(
    task_id='second_date'
    ,
    bash_command='echo date'
    , dag=dag, env=None, output_encoding='utf-8')

first_date >> second_date




0

从10月28日提交的代码使得Airflow具备了时区感知能力。 - Breathe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接