我使用Airflow的Python operators来执行针对Redshift/Postgres数据库的SQL查询。为了进行调试,我希望DAG返回SQL执行结果,类似于在控制台本地执行时看到的结果:
我使用
我不太了解Airflow或Python DBAPI的低级工作原理,但是
我觉得很困惑,因为我想象中运行ETLs的主要用例应该是能够轻松做到这一点。我听说可以简单地创建额外的任务,在查询之前和之后查询该表,但这似乎很笨拙且效果不佳。
请问是否有人能够解释如何实现这一点,如果不能,请解释原因?欢迎使用其他方法来实现类似的结果。谢谢!
到目前为止,我尝试了
我希望记录器能够返回类似以下的内容:
我使用
psycopg2
创建连接/游标并执行SQL。记录这些信息将非常有助于确认解析后的参数化SQL,并确认实际插入的数据(我曾经痛苦地遇到过由于环境差异导致的意外行为)。我不太了解Airflow或Python DBAPI的低级工作原理,但是
pscyopg2
文档似乎提到了一些方法和连接配置,可以实现这一点。我觉得很困惑,因为我想象中运行ETLs的主要用例应该是能够轻松做到这一点。我听说可以简单地创建额外的任务,在查询之前和之后查询该表,但这似乎很笨拙且效果不佳。
请问是否有人能够解释如何实现这一点,如果不能,请解释原因?欢迎使用其他方法来实现类似的结果。谢谢!
到目前为止,我尝试了
connection.status_message()
方法,但它似乎只返回SQL的第一行而不是结果。我还尝试创建日志游标,但它只产生了SQL,而不是控制台结果。import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection
conn = pg.connect(
connection_factory=LoggingConnection,
...
)
conn.autocommit = True
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)
cur = conn.cursor()
sql = """
INSERT INTO mytable (
SELECT *
FROM other_table
);
"""
cur.execute(sql)
我希望记录器能够返回类似以下的内容:
sql> INSERT INTO mytable (
SELECT ...
[2019-07-25 23:00:54] 912 rows affected in 4 s 442 ms
cur.messages
方法,只有cur.statusmessage
。 - Jonathan