I've recently implemented a luigi pipeline to handle the processing for one of our bioinformatics pipelines. However, there's something fundamental about how to setup these tasks that I'm not grasping.
Let's say I've got a chain of three tasks that I'd like to be able to run with multiple workers. For example, the dependency graph for three workers might look like:
/ taskC -> taskB -> taskA
- taskC -> taskB -> taskA
\ taskC -> taskB -> taskA
and I might write
class entry(luigi.Task):
in_dir = luigi.Parameter()
def requires(self):
for f in self.in_dir:
yield taskC(pass_through=f)
def run(self):
some logic using self.input().path
from each worker in the above yield
class taskA(luigi.Task):
in_file_A = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outA.txt')
def run(self):
some logic generating outA.txt
class taskB(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
return taskA(in_file_A=self.pass_through)
def run(self):
some logic using self.input().path [outA.txt]
and generating self.output().path [outB.txt]
class taskC(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outC.txt')
def requires(self):
return taskB(pass_through=self.pass_through)
def run(self):
some logic using self.input().path [outB.txt]
and generating self.output().path [outC.txt]
If my code lives in pipeline.py
I might launch this with:
luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/
The fact that I'm sending parameter pass_through
all the way through to taskA
doesn't feel like the right approach. Furthermore, if sometime in the future I already have the data generated (separately) by taskA
, taskB
isn't flexible enough to handle that situation. Perhaps I might write:
class taskB(luigi.Task):
in_file_B = luigi.Parameter() # if we already have the output of taskA
pass_through = luigi.Parameter() # if we require taskA
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
if self.pass_through:
return taskA(in_file_A=self.pass_through)
def run(self):
if self.input().path:
logic_input = self.input().path
else:
logic_input = self.in_file_B
some logic using 'logic_input'
and generating self.output().path [outB.txt]
I'd like to know if this is the 'proper' design pattern for Luigi or if I'm completely off base.