能否编写一个Luigi包装任务来容忍失败的子任务?

15

我有一个Luigi任务,执行一些不稳定的计算。可以将其视为一种优化过程,有时无法收敛。

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)
现在我想构建一个包装任务,该任务将使用不同的输入参数多次运行此优化器,并将选择第一次收敛的运行的输出。
我现在的实现方式是不使用MyOptimizer,因为如果它失败,luigi将认为包装任务也已经失败了,但我可以接受一些MyOptimizer实例失败。
MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)
问题在于这种方式,任务并没有并行化。此外,你可以想象`MyOptimizer`和`optimize_something`是复杂的任务,也参与由luigi处理的数据流水线,这在我的代码中几乎造成了混乱。
我会欣赏任何有关如何以类似luigi的方式使其工作的见解和想法 :)

2
这似乎与此相关 https://dev59.com/ArDla4cB1Zd3GeqP3SAZ - Maya Gershovitz Bar
1个回答

1

您能否让您的优化器始终输出一些内容?即使是一个空文件来表示失败,但在luigi中看起来是成功的?此外,请使用MyOptimizer的input_param在输出文件名中使文件名唯一。

然后:

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        task_list = [MyOptimizer(input_param) for input_param in self.input_params_list]
        targets = yield task_list #executes tasks in parallel 
        for target in targets:
             ...do something to read and compare outputs
             some_data = some_read(target.path)
        ...write optimal solution

    def output(self):
        return luigi.LocalTarget(self.output_filename)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接