我正在使用django作为Web框架。我需要一个工作流引擎,可以执行同步和异步(批量任务)的任务链。我找到了celery和luigi这两个批处理工作流程模块。我的第一个问题是这两个模块之间有什么区别。 Luigi允许我们重新运行失败的任务链,只有失败的子任务会被重新执行。那么Celery呢:...
我试图以非常简单的方式学习如何使用Luigi。作为一个新手,我编写了这段代码。import luigi class class1(luigi.Task): def requires(self): return class2() def output(self): ...
据我所知,luigi.Target要么存在,要么不存在。因此,如果luigi.Target存在,则它不会被重新计算。 我正在寻找一种方法,如果其依赖项中的任何一个被修改,或者任务代码发生更改,则强制重新计算该任务。
我的初始文件位于 AWS S3。有人能指点我如何在 Luigi Task 中设置这个吗?我查阅了文档,找到了 luigi.S3,但我不清楚该怎么做,然后我在网上搜索,只找到了来自 mortar-luigi 的链接和在 luigi 顶部实现的实现。更新按照 @matagus 提供的示例(我也按建...
我在我的项目中,尝试在 Luigi 流水线中编写一个二进制 LocalTarget,但遇到了困难。我在这里找到了问题所在:class LuigiTest(luigi.Task): def output(self): return luigi.LocalTarget('t...
目前,我有一堆luigi任务排队在一起,形成一个简单的依赖链(a -> b -> c -> d)。 d最先执行,a最后执行。触发任务的是任务a。 除了a,所有的目标都返回一个luigi.LocalTarget()对象,并且只有一个通用的luigi.Parameter(),其...
我有一个Luigi任务,执行一些不稳定的计算。可以将其视为一种优化过程,有时无法收敛。import luigi MyOptimizer(luigi.Task): input_param: luigi.Parameter() output_filename = luigi.Pa...
我正在使用Spotify的Luigi来构建自然语言处理任务流水线,并且使用Python 3.6编写了我的第一个项目。 我注意到Task类的output()函数总是返回某种类型的Target对象,该对象只是某个文件,无论是本地还是远程位置。因为我的任务生成了更复杂的数据结构,例如解析树,在将其...
我是luigi的新手,是在设计我们的机器学习流水线时遇到了它。虽然它不符合我的特定用例,但它有很多额外的功能,所以我决定让它适应我的项目。 基本上我正在寻找的是一种方法来持久化自定义流水线,从而使其结果可重复并更容易部署,在阅读大多数在线教程后,我尝试使用现有的luigi.cfg配置和命令行...
我定义了三个任务T1,T2和T3,然后定义了一个任务T4如下: class T4(luigi.Task) def requires(self): return [T1(), T2(), T3()] 有没有一种自然的方式告诉Luigi我希望这些任务T1、T2和T3可...