我位于UTC+4时区,因此当Airflow触发每晚的ETL时,这里已经是凌晨4:00了。我该如何告诉Airflow在当天的ds上运行前一天的20:00运行,但ds=ds呢?
根据文档强烈建议将所有服务器保持在UTC上,因此我正在寻找应用层面的解决方案。
编辑:一个笨拙的解决方案是将其定义为每天在晚上8点运行,所以是“前一天”,但然后在作业中使用tomorrow_ds
而不是ds
。但这仍然在Airflow UI上看起来很奇怪,因为它将显示UTC执行时间。
我位于UTC+4时区,因此当Airflow触发每晚的ETL时,这里已经是凌晨4:00了。我该如何告诉Airflow在当天的ds上运行前一天的20:00运行,但ds=ds呢?
根据文档强烈建议将所有服务器保持在UTC上,因此我正在寻找应用层面的解决方案。
编辑:一个笨拙的解决方案是将其定义为每天在晚上8点运行,所以是“前一天”,但然后在作业中使用tomorrow_ds
而不是ds
。但这仍然在Airflow UI上看起来很奇怪,因为它将显示UTC执行时间。
调度间隔也可以是“cron表达式”,这意味着您可以轻松地在20:00 UTC运行它。再加上“user_defined_filters”的话,这意味着您可以通过一些技巧获得所需的行为:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import pytz
tz = pytz.timezone('Asia/Dubai')
def localize_utc_tz(d):
return tz.fromutc(d)
default_args = {
'start_date': datetime(2017, 11, 8),
}
dag = DAG(
'plus_4_utc',
default_args=default_args,
schedule_interval='0 20 * * *',
user_defined_filters={
'localtz': localize_utc_tz,
},
)
task = BashOperator(
task_id='task_for_testing_file_log_handler',
dag=dag,
bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)
这将输出:
UTC 2017-11-08T20:00:00, 本地时间 2017-11-09 00:00:00+04:00, 下一个时间点为 2017-11-10 00:00:00+04:00
您需要注意使用的“变量类型”。例如,ds
和ts
是字符串而不是日期时间对象,这意味着过滤器无法对它们起作用。
我也遇到了同样的问题。我有每天、每小时和每半小时的任务。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum
local_tz = pendulum.timezone("Asia/Calcutta")
args = {
'owner': 'ganesh',
'depends_on_past': False,
'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
'email': ['abcd@test.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='test1',
default_args=args,
schedule_interval='30 00 * * *'
)
first_date = BashOperator(
task_id='first_date'
,
bash_command='date'
, dag=dag, env=None, output_encoding='utf-8')
second_date = BashOperator(
task_id='second_date'
,
bash_command='echo date'
, dag=dag, env=None, output_encoding='utf-8')
first_date >> second_date
start_date = datetime(2017, 1, 1, tzinfo=“Europe/Amsterdam”)
。请参见https://github.com/apache/incubator-airflow/pull/2781以跟踪此问题。它可能会在Airflow 1.10中得到解决。 - Ash Berlin-Taylor