如何在Airflow中记录SQL执行结果?

5
我使用Airflow的Python operators来执行针对Redshift/Postgres数据库的SQL查询。为了进行调试,我希望DAG返回SQL执行结果,类似于在控制台本地执行时看到的结果:
我使用psycopg2创建连接/游标并执行SQL。记录这些信息将非常有助于确认解析后的参数化SQL,并确认实际插入的数据(我曾经痛苦地遇到过由于环境差异导致的意外行为)。
我不太了解Airflow或Python DBAPI的低级工作原理,但是pscyopg2文档似乎提到了一些方法和连接配置,可以实现这一点。
我觉得很困惑,因为我想象中运行ETLs的主要用例应该是能够轻松做到这一点。我听说可以简单地创建额外的任务,在查询之前和之后查询该表,但这似乎很笨拙且效果不佳。
请问是否有人能够解释如何实现这一点,如果不能,请解释原因?欢迎使用其他方法来实现类似的结果。谢谢!
到目前为止,我尝试了connection.status_message()方法,但它似乎只返回SQL的第一行而不是结果。我还尝试创建日志游标,但它只产生了SQL,而不是控制台结果。
import logging
import psycopg2 as pg
from psycopg2.extras import LoggingConnection

conn = pg.connect(
    connection_factory=LoggingConnection,
    ...
)
conn.autocommit = True

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
conn.initialize(logger)

cur = conn.cursor()

sql = """    
    INSERT INTO mytable (
    SELECT *
    FROM other_table
    );
"""

cur.execute(sql)

我希望记录器能够返回类似以下的内容:
sql> INSERT INTO mytable (
     SELECT ...
[2019-07-25 23:00:54] 912 rows affected in 4 s 442 ms
2个回答

1
好的,经过一些试错,我找到了适合我的设置和目标的方法。回顾一下,我的目标是通过Python脚本在Airflow中编排运行ETL。查看statusmessage的文档:
“只读属性,包含上一条命令返回的消息:”
关键是要在与服务器执行的事务上下文中管理日志记录。为了做到这一点,我必须明确地设置con.autocommit = False,并用BEGIN TRANSACTION;END TRANSACTION; 包装SQL块。如果您在删除或插入语句之后直接插入 cur.statusmessage,您将得到类似于'INSERT 0 92380'的响应。
这仍然不如我所希望的那么详细,但它比没有好得多,并且对于在Airflow日志中排除ETL问题非常有用。
副笔: - 当autocommit设置为False时,必须显式提交事务。 - 在您的SQL中陈述事务开始/结束可能是不必要的。这可能取决于您的数据库版本。
con = psy.connect(...)
con.autocommit = False
cur = con.cursor()

try:
    cur.execute([some_sql])
    logging.info(f"Cursor statusmessage: {cur.statusmessage})
except:
    con.rollback()
finally:
    con.close()

Psycopg2 中有一些潜在的功能可以被利用,但是文档相当薄弱,也没有清晰的示例。如果有人有关于如何利用 logobjects 或返回 join PID 以某种方式检索其他信息的建议,请提供。


1
假设您正在编写一个使用Postgres钩子执行SQL操作的运算符。
运算符中打印的任何内容都会被记录在日志中。
因此,如果您想记录语句,请在运算符中打印该语句。
print(sql)

如果你想记录结果,获取结果并打印结果。例如:
result = cur.fetchall()
for row in result:
    print(row)

或者您可以使用self.log.info代替print,其中self指的是运算符实例。


你好,你使用的psycopg版本是哪个?对我来说,没有cur.messages方法,只有cur.statusmessage - Jonathan
@Jonathan 好的,我已经删除了那一部分。主要观点仍然存在:如果你想让信息记录到日志中,就打印它或使用 self.log.info。 - dstandish

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接