Apache Airflow 2.1.0会将返回值写入日志文件。

4

我正在尝试使用Taskflow API范例,将pandas dataframe从一个任务传递到另一个任务。

与2.0.2版本不同,Airflow现在将任务返回值写入日志文件。由于这种行为,我的整个数据帧(84mb)在每次任务执行时都会被写入日志文件中。

  • 我正在使用2.0.2和2.1.0 Docker镜像版本,在Docker容器中运行Airflow。
  • 我知道这不是通过任务传递大型数据的正确方式。

我想知道是否有任何配置来避免这种新的行为。

以下是一些示例:

Airflow 2.0.2中的任务日志:

*** Reading local file: /opt/airflow/logs/procesador_tabla_ot_estructurada_nav/extraer/2021-06-15T11:43:56.695836+00:00/1.log
[2021-06-15 11:44:00,218] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:43:56.695836+00:00 [queued]>
[2021-06-15 11:44:00,235] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:43:56.695836+00:00 [queued]>
[2021-06-15 11:44:00,236] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-06-15 11:44:00,236] {taskinstance.py:1069} INFO - Starting attempt 1 of 1
[2021-06-15 11:44:00,236] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-06-15 11:44:00,245] {taskinstance.py:1089} INFO - Executing <Task(_PythonDecoratedOperator): extraer> on 2021-06-15T11:43:56.695836+00:00
[2021-06-15 11:44:00,251] {standard_task_runner.py:52} INFO - Started process 238 to run task
[2021-06-15 11:44:00,260] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'procesador_tabla_ot_estructurada_nav', 'extraer', '2021-06-15T11:43:56.695836+00:00', '--job-id', '22', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/procesador_tabla_ot_estructurada_nav.py', '--cfg-path', '/tmp/tmp23k2w273', '--error-file', '/tmp/tmpv3qqwlau']
[2021-06-15 11:44:00,262] {standard_task_runner.py:77} INFO - Job 22: Subtask extraer
[2021-06-15 11:44:00,325] {logging_mixin.py:104} INFO - Running <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:43:56.695836+00:00 [running]> on host 25958216ea79
[2021-06-15 11:44:00,410] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=XXXXX
AIRFLOW_CTX_DAG_ID=procesador_tabla_ot_estructurada_nav
AIRFLOW_CTX_TASK_ID=extraer
AIRFLOW_CTX_EXECUTION_DATE=2021-06-15T11:43:56.695836+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-15T11:43:56.695836+00:00
[2021-06-15 11:44:26,166] {taskinstance.py:1185} INFO - Marking task as SUCCESS. dag_id=procesador_tabla_ot_estructurada_nav, task_id=extraer, execution_date=20210615T114356, start_date=20210615T114400, end_date=20210615T114426
[2021-06-15 11:44:26,225] {taskinstance.py:1246} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2021-06-15 11:44:26,287] {local_task_job.py:146} INFO - Task exited with return code 0

Airflow 2.1.0中的任务日志文件:

[2021-06-15 11:29:32,592] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:29:30.550613+00:00 [queued]>
[2021-06-15 11:29:32,618] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:29:30.550613+00:00 [queued]>
[2021-06-15 11:29:32,618] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-06-15 11:29:32,618] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-06-15 11:29:32,618] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-06-15 11:29:32,627] {taskinstance.py:1087} INFO - Executing <Task(_PythonDecoratedOperator): extraer> on 2021-06-15T11:29:30.550613+00:00
[2021-06-15 11:29:32,633] {standard_task_runner.py:52} INFO - Started process 1110 to run task
[2021-06-15 11:29:32,637] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'procesador_tabla_ot_estructurada_nav', 'extraer', '2021-06-15T11:29:30.550613+00:00', '--job-id', '18', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/procesador_tabla_ot_estructurada_nav.py', '--cfg-path', '/tmp/tmpq89rbbgh', '--error-file', '/tmp/tmp99fa29i9']
[2021-06-15 11:29:32,639] {standard_task_runner.py:77} INFO - Job 18: Subtask extraer
[2021-06-15 11:29:32,711] {logging_mixin.py:104} INFO - Running <TaskInstance: procesador_tabla_ot_estructurada_nav.extraer 2021-06-15T11:29:30.550613+00:00 [running]> on host 6796ffa7a616
[2021-06-15 11:29:32,815] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=procesador_tabla_ot_estructurada_nav
AIRFLOW_CTX_TASK_ID=extraer
AIRFLOW_CTX_EXECUTION_DATE=2021-06-15T11:29:30.550613+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-15T11:29:30.550613+00:00
**[2021-06-15 11:29:53,001] {python.py:151} INFO - Done. Returned value was: LITERALLY 84 MB OF A PANDAS DATAFRAME**
[2021-06-15 11:30:14,401] {taskinstance.py:1184} INFO - Marking task as SUCCESS. dag_id=procesador_tabla_ot_estructurada_nav, task_id=extraer, execution_date=20210615T112930, start_date=20210615T112932, end_date=20210615T113014
[2021-06-15 11:30:14,447] {taskinstance.py:1245} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2021-06-15 11:30:14,508] {local_task_job.py:151} INFO - Task exited with return code 0
1个回答

3

如果你正在寻找包含该运算符的任务语法,你可以使用@task(show_return_value_in_logs=False)或者@task.virtualenv(show_return_value_in_logs=False) - Matthias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接