你有没有考虑在可调用函数周围使用装饰器/高阶函数?
我正在考虑使用类似以下的东西:
def conf_task_id_skip(python_callable):
def skip_if_configured(*args, **context):
task_id = context["task_id"]
dag_run = context["dag_run"]
skip_task_ids = dag_run.conf.get("skip_task_ids", [])
if skip_task_ids and task_id in skip_task_ids:
return None
else:
return python_callable(*args, **context)
return skip_if_configured
PythonOperator(
task_id="task_id",
python_callable=conf_task_id_skip(task_callable)
)
然后,如果我想要的话,我可以手动跳过我想要跳过的任务(并仍然成功)。
如果您希望如此,您还可以通过添加检查来增加鲁棒性,以确保是否禁止跳过(例如在生产环境中):
def conf_task_id_skip(python_callable):
def skip_if_configured(*args, **context):
if Variable.get("disallow_conf_task_id_skip"):
return python_callable(*args, **context)
task_id = context["task_id"]
dag_run = context["dag_run"]
skip_task_ids = dag_run.conf.get("skip_task_ids", [])
if skip_task_ids and task_id in skip_task_ids:
return None
else:
return python_callable(*args, **context)
return skip_if_configured
ShortCircuitOperator
或BranchPythonOperator
与TriggerDagRunOperator
相结合来链接它们。这种方法仍然存在的问题是,如果您仍有更多部分需要在这些小型顶级DAG之后运行,那么您将需要像ExternalTaskSensor
这样的东西来等待完成这些小型DAG以触发它们。难看的。 - y2k-shubhamenv-var
(动态地)修改dag的方法,我们没有找到跳过airflow任务的方法,但我们意识到可以基于env-var
创建一个dag。我们所有的任务基本上都是相同的,因此我们根据保存在env-var
中的任务列表创建了一个循环来创建它们。然后,当我们想要跳过某些任务时,我们使用图形算法修改该变量。希望这有所帮助。 - Pablo