Airflow自定义jinja2过滤器

7
我正在尝试为我的Airflow Jinja2模板添加自定义过滤器。由于我的S3文件夹格式如下:“/年/月/日/”,所以我的目的是在变量屏幕中使用yesterday_ds,如下所示:

s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/。

我在PR中看到(我认为已经合并了...)可以在dag对象创建中的dag_args参数中使用“user_defined_filters”参数来实现这一点,链接在此处。但问题是,即使这样做,它仍然会显示“没有名为get_year的过滤器”。以下是我的代码: dag.py。
   dag = DAG(
        dag_id='dag-name',
        default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
        template_searchpath=tmpl_search_path,
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
        )

utils.py

def get_dag_args(**kwargs):
return {
    'owner'               : kwargs.get('owner', 'owner_name'),
    'depends_on_past'     : kwargs.get('depends_on_past', False),
    'start_date'          : kwargs.get('start_date', datetime.now() - timedelta(1)),
    'email'               : kwargs.get('email', ['blabla@blabla.com']),
    'retries'             : kwargs.get('retries', 5),
    'provide_context'     : kwargs.get('provide_context', True),
    'retry_delay'         : kwargs.get('retry_delay', timedelta(minutes=5)),
    'user_defined_filters': get_date_filters()
    }


def get_date_filters():
    return dict(
        get_year=lambda date_string: date_string.strftime('%Y'),
        get_month=lambda date_string: date_string.strftime('%m'),
        get_day=lambda date_string: date_string.strftime('%d'),
        )

有人能看出我的错误在哪里吗?谢谢!

编辑

在dag定义之后打印这个,不幸的是没有自定义的过滤器。

jinja_env = dag.get_template_env()
print(jinja_env.filters)

此外,如果我尝试将其直接添加为DAG对象参数,就像在tests @ tests / models.py中显示的那样:
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'

编辑 2

我看到的是我使用的是1.8.0版本,这个版本没有过滤器。有人知道如何通过pip下载1.8.2rc版本吗?还是不行?


谢谢您对此事的跟进,是的,您的用法看起来是正确的。我建议将问题标题更改为包括 airflow v1.8.0,因为这里的问题和答案似乎仅限于该范围。 - 7yl4r
2个回答

4

Airflow现在支持自定义过滤器和宏。

工作代码示例:

from airflow import DAG
from datetime import datetime, timedelta

def first_day_of_month(any_day):
    return any_day.replace(day=1)


def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + timedelta(days=4)  # this will never fail
    return next_month - timedelta(days=next_month.day)


def isoformat_month(any_date):
    return any_date.strftime("%Y-%m")


with DAG(
        dag_id='generate_raw_logs',
        default_args=default_args,
        schedule_interval=timedelta(minutes=120),
        catchup=False,
        user_defined_macros={
            'first_day_of_month': first_day_of_month,
            'last_day_of_month': last_day_of_month,
        },
        user_defined_filters={
            'isoformat_month': isoformat_month
        }
)

0

Airflow的打包名称已在pip上更改。现在可以使用pip install apache-airflow来下载1.8.2rc1版本。

此外,请注意根据邮件列表,他们目前正在将1.8.2rc4版本发布为1.8.2版本。


它在文档的资源和链接章节中有描述(https://airflow.incubator.apache.org/project.html)。 - faeder

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接