作者:龙井龙井2502908921 | 来源:互联网 | 2022-12-19 17:42
我正在使用luigi执行一系列任务,如下所示:
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('test.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
def requires(self):
return Task1(stuff=self.stuff)
def output(self):
return luigi.LocalTarget('something-else.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
当我像这样开始整个工作流程时,这完全符合要求:
luigi.build([Task2(stuff='stuff')])
使用时,luigi.build
您还可以通过显式传递参数来运行多个任务,如文档中的此示例所示.
但是,在我的情况下,我还希望能够Task2
完全独立于其参与工作流程来运行业务逻辑.根据此示例requires
,这适用于未实现的任务.
我的问题是,我如何将这种方法既作为工作流程的一部分,又作为工作流程的一部分运行?显然,我可以添加一个新的私有方法_my_custom_run
,它获取数据并返回结果,然后使用这个方法run
,但它只是感觉像应该被融入框架的东西,所以它让我觉得我是误解Luigi的最佳实践(仍在学习框架).任何建议表示赞赏,谢谢!