Luigi flexible pipeline and passing parameters all

2019-08-05 06:08发布

问题:

I've recently implemented a luigi pipeline to handle the processing for one of our bioinformatics pipelines. However, there's something fundamental about how to setup these tasks that I'm not grasping.

Let's say I've got a chain of three tasks that I'd like to be able to run with multiple workers. For example, the dependency graph for three workers might look like:

/ taskC -> taskB -> taskA
- taskC -> taskB -> taskA
\ taskC -> taskB -> taskA

and I might write

class entry(luigi.Task):

    in_dir = luigi.Parameter()

    def requires(self):
        for f in self.in_dir:
            yield taskC(pass_through=f)

    def run(self):
        some logic using self.input().path
        from each worker in the above yield

class taskA(luigi.Task):

    in_file_A = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outA.txt')

    def run(self):
        some logic generating outA.txt

class taskB(luigi.Task):

    pass_through = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outB.txt')

    def requires(self):
        return taskA(in_file_A=self.pass_through)

    def run(self):
        some logic using self.input().path [outA.txt] 
        and generating self.output().path [outB.txt]

class taskC(luigi.Task):

    pass_through = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outC.txt')

    def requires(self):
        return taskB(pass_through=self.pass_through)

    def run(self):
        some logic using self.input().path [outB.txt] 
        and generating self.output().path [outC.txt]

If my code lives in pipeline.py I might launch this with:

luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/

The fact that I'm sending parameter pass_through all the way through to taskA doesn't feel like the right approach. Furthermore, if sometime in the future I already have the data generated (separately) by taskA, taskB isn't flexible enough to handle that situation. Perhaps I might write:

class taskB(luigi.Task):

    in_file_B = luigi.Parameter() # if we already have the output of taskA
    pass_through = luigi.Parameter() # if we require taskA

    def output(self):
        return luigi.LocalTarget('outB.txt')

    def requires(self):
        if self.pass_through:
            return taskA(in_file_A=self.pass_through)

    def run(self):
        if self.input().path:
           logic_input = self.input().path
        else:
           logic_input = self.in_file_B

        some logic using 'logic_input'
        and generating self.output().path [outB.txt]

I'd like to know if this is the 'proper' design pattern for Luigi or if I'm completely off base.

回答1:

I think this is largely an artifact of the abstract tasks you have here, in the real world, you probably need to know at each where you are reading from / writing to. See for example:

class DecompressTask(luigi.Task):
    dirname = luigi.Parameter()
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(os.path.join(self.dirname , self.filename + ".txt"))

    def run(self):
        decompress(os.path.join(self.dirname, self.filename + ".gz"),
                   os.path.join(self.dirname, self.filename + ".txt"))


class TranslateTask(luigi.Task):
    dirname = luigi.Parameter()
    filename = luigi.Parameter()

    def requires(self):
        return DecompressTask(dirname=self.dirname, filename=self.filename)

    def output(self):
        return luigi.LocalTarget(os.path.join(self.dirname + self.filename + ".translated"))

    def run(self):
        translate(os.path.join(self.dirname, self.filename + ".txt"),
                  os.path.join(self.dirname, self.filename + ".translated"))


class ProcessDirectory(luigi.WrapperTask):
    dirname = luigi.Parameter()

    def requires(self):
        tasks = []
        for file_name in os.listdir(self.dirname):
            if file_name.endswith("gz"):
                prefix = file_name.split(".")[0]
                tasks.append(TranslateTask(filename=prefix, dirname=self.dirname))
        return tasks