我正在尝试编写一个管道,其中postgres数据库应在将csv文件带到文件夹时更新其内容。我编写了一个dag,当从Web UI触发它时,它会创建表并推送csv内容。以下是代码:
from datetime import datetime
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
with DAG('Write_data_to_PG', description='This DAG is for writing data to postgres.',
schedule_interval='*/5 * * * *',
start_date=datetime(2018, 11, 1), catchup=False) as dag:
create_table = PostgresOperator(
task_id='create_table',
sql="""CREATE TABLE users(
id integer PRIMARY KEY,
email text,
name text,
address text
)
""",
)
def my_func():
print('Pushing data in database.')
conn = psycopg2.connect("host=localhost dbname=testdb user=testuser")
print(conn)
cur = conn.cursor()
print(cur)
with open('test.csv', 'r') as f:
next(f) # Skip the header row.
cur.copy_from(f, 'users', sep=',')
conn.commit()
print(conn)
print('DONE!!!!!!!!!!!.')
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
create_table >> python_task
我无法弄清楚如何在手动将csv粘贴/带入文件夹时触发任务。希望能得到帮助,谢谢。