使用XCom在类之间交换数据?

4
我有一个DAG,它使用专门用于数据预处理例程的类执行不同的方法:
from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')

if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    from table_builder import OnlineOfflinePreprocess
else:
    print('Define MARKETING_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'max_active_runs': 1,
  'concurrency': 4
}

worker = OnlineOfflinePreprocess()

DAG = DAG(
  dag_id='marketing_data_preproc',
  default_args=default_args,
  start_date=datetime.today()
)

import_online_data = PythonOperator(
  task_id='import_online_data',
  python_callable=worker.import_online_data,
  dag=DAG)

import_offline_data = PythonOperator(
  task_id='import_offline_data',
  python_callable=worker.import_offline_data,
  dag=DAG)

merge_aurum_to_sherlock = PythonOperator(
  task_id='merge_aurum_to_sherlock',
  python_callable=worker.merge_aurum_to_sherlock,
  dag=DAG)

merge_sherlock_to_aurum = PythonOperator(
   task_id='merge_sherlock_to_aurum',
   python_callable=worker.merge_sherlock_to_aurum,
   dag=DAG)

upload_au_to_sh = PythonOperator(
  task_id='upload_au_to_sh',
  python_callable=worker.upload_table,
  op_args='aurum_to_sherlock',
  dag=DAG)

upload_sh_to_au = PythonOperator(
  task_id='upload_sh_to_au',
  python_callable=worker.upload_table,
  op_args='sherlock_to_aurum',
  dag=DAG)

import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock

merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au

这会产生以下错误:

[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'

考虑到airflow的工作原理,这其实是很明显的:不同类方法调用的输出结果并未存储在在图表顶部初始化的全局类对象中。

我能用XCom解决这个问题吗?总体上,如何将面向对象编程(OOP)与Airflow的协调性结合起来是一个需要思考的问题。

1个回答

8

在Airflow中,问题并不是关于面向对象编程(OOP),而是关于状态管理。

任何需要在任务之间传递的状态都需要被持久化存储。这是因为每个Airflow任务都是独立的进程(甚至可能在不同的计算机上运行!),因此内存通信是不可能的。

如果状态很小,您可以使用XCOM来传递它(因为XCOM将其存储在Airflow数据库中)。如果状态很大,则可能希望将其存储在其他地方,例如文件系统、S3、HDFS或专用数据库。


很好,有道理。我正在处理相当大的表格,所以在这里合理的选择应该是你提到的非本地存储选项之一。XCom似乎不适合传输GB级别的数据。只是想确保我没有错过任何显而易见的东西。干杯! - aaron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接