我正在尝试为我的Airflow Jinja2模板添加自定义过滤器。由于我的S3文件夹格式如下:“/年/月/日/”,所以我的目的是在变量屏幕中使用yesterday_ds,如下所示:
s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/。
我在PR中看到(我认为已经合并了...)可以在dag对象创建中的dag_args参数中使用“user_defined_filters”参数来实现这一点,链接在此处。但问题是,即使这样做,它仍然会显示“没有名为get_year的过滤器”。以下是我的代码: dag.py。
此外,如果我尝试将其直接添加为DAG对象参数,就像在tests @ tests / models.py中显示的那样:
s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/。
我在PR中看到(我认为已经合并了...)可以在dag对象创建中的dag_args参数中使用“user_defined_filters”参数来实现这一点,链接在此处。但问题是,即使这样做,它仍然会显示“没有名为get_year的过滤器”。以下是我的代码: dag.py。
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['blabla@blabla.com']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
有人能看出我的错误在哪里吗?谢谢!
编辑
在dag定义之后打印这个,不幸的是没有自定义的过滤器。
jinja_env = dag.get_template_env()
print(jinja_env.filters)
此外,如果我尝试将其直接添加为DAG对象参数,就像在tests @ tests / models.py中显示的那样:
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
编辑 2
我看到的是我使用的是1.8.0版本,这个版本没有过滤器。有人知道如何通过pip下载1.8.2rc版本吗?还是不行?
airflow v1.8.0
,因为这里的问题和答案似乎仅限于该范围。 - 7yl4r