我有一个Luigi任务,执行一些不稳定的计算。可以将其视为一种优化过程,有时无法收敛。
import luigi
MyOptimizer(luigi.Task):
input_param: luigi.Parameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
optimize_something(self.input_param, self.output().path)
def output(self):
return luigi.LocalTarget(self.output_filename)
现在我想构建一个包装任务,该任务将使用不同的输入参数多次运行此优化器,并将选择第一次收敛的运行的输出。我现在的实现方式是不使用MyOptimizer,因为如果它失败,luigi将认为包装任务也已经失败了,但我可以接受一些MyOptimizer实例失败。
MyWrapper(luigi.Task):
input_params_list = luigi.ListParameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
for input_param in self.input_params_list:
try:
optimize_something(self.input_param, self.output().path)
print(f"Optimizer succeeded with input {input_param}")
break
except Exception as e:
print(f"Optimizer failed with input {input_param}. Trying again...")
def output(self):
return luigi.LocalTarget(self.output_filename)
问题在于这种方式,任务并没有并行化。此外,你可以想象`MyOptimizer`和`optimize_something`是复杂的任务,也参与由luigi处理的数据流水线,这在我的代码中几乎造成了混乱。我会欣赏任何有关如何以类似luigi的方式使其工作的见解和想法 :)