基于文件系统变化触发Airflow DAG

4
我正在尝试编写一个管道,其中postgres数据库应在将csv文件带到文件夹时更新其内容。我编写了一个dag,当从Web UI触发它时,它会创建表并推送csv内容。以下是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2

with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', 
    schedule_interval='*/5 * * * *',
         start_date=datetime(2018, 11, 1), catchup=False) as dag:
    create_table = PostgresOperator(
        task_id='create_table',
        sql="""CREATE TABLE users(
            id integer PRIMARY KEY,
            email text,
            name text,
            address text
        )
        """,
    )

    def my_func():
        print('Pushing data in database.')
        conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
        print(conn)

        cur = conn.cursor()
        print(cur)

        with open('test.csv', 'r') as f:
            next(f)  # Skip the header row.
            cur.copy_from(f, 'users', sep=',')

        conn.commit()
        print(conn)
        print('DONE!!!!!!!!!!!.')


    python_task = PythonOperator(task_id='python_task', python_callable=my_func)

    create_table >> python_task

我无法弄清楚如何在手动将csv粘贴/带入文件夹时触发任务。希望能得到帮助,谢谢。


您可以创建一个DAG,定期检查是否有新文件移动到文件夹中,如果有文件被移动,则从那里触发另一个DAG。 - neilharia7
2个回答

5

原来Airflow有专门用于此类需求的特殊模块。我使用Airflow自带的FileSensor解决了这个问题。

根据文档:

FileSensor 等待文件或文件夹落入文件系统。如果所给路径是一个目录,那么只有当目录内存在任何文件(直接或在子目录中)时,该传感器才会返回 true。

下面是修改后的代码,它等待名为test.csv的文件,并仅在在Airflow文件夹(或任何文件夹,需要指定路径)中找到该文件时才继续执行下一个任务:

from datetime import datetime
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2

with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.', schedule_interval='*/5 * * * *',
         start_date=datetime(2018, 11, 1), catchup=False) as dag:
    create_table = PostgresOperator(
        task_id='create_table',
        sql="""CREATE TABLE users(
            id integer PRIMARY KEY,
            email text,
            name text,
            address text
        )
        """,
    )


    def my_func():
        print('Creating table in database.')
        conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
        print(conn)

        cur = conn.cursor()
        print(cur)

        with open('test.csv', 'r') as f:
            next(f)  # Skip the header row.
            cur.copy_from(f, 'users', sep=',')

        conn.commit()
        print(conn)
        print('DONE!!!!!!!!!!!.')


    file_sensing_task = FileSensor(task_id='sense_the_csv',
                                   filepath='test.csv',
                                   fs_conn_id='my_file_system',
                                   poke_interval=10)

    python_task = PythonOperator(task_id='populate_data', python_callable=my_func)

    create_table >> file_sensing_task >> python_task

2
听起来你正在寻找文件系统写入事件。
对于更低级别的Linux,请查看inotify: https://pypi.org/project/inotify/ 对于在Mac或Windows上也适用的更高级实现:https://pypi.org/project/watchdog/ 这个想法是添加事件监视器/事件处理程序,以便捕捉文件/目录的修改。该事件将包含新创建/修改的文件的文件路径。

1
谢谢你的回答!看门狗帮助我监听文件更改,但我无法弄清楚如何在文件系统返回该文件的真实值时触发dag。最终,我使用airflow提供的FileSensor解决了这个问题。 - Yudhishthir Singh
1
很高兴听到这个消息,我之前不知道有FileSensor,所以今天学到了新东西 ;) - smassey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接