I am using luigi to execute a chain of tasks, like so:
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('test.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
def requires(self):
return Task1(stuff=self.stuff)
def output(self):
return luigi.LocalTarget('something-else.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
This works exactly as desired when I start the entire workflow like so:
luigi.build([Task2(stuff='stuff')])
When using luigi.build
you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.
However, in my situation, I would also like to be able to run the business logic of Task2
completely independently of it's involvement in the workflow. This works fine for tasks that do not implement requires
, as per this example.
My question is, how can I run this method both as part of the workflow, as well as on it's own? Obviously, I could just add a new private method like _my_custom_run
, which takes the data and returns the result, and then use this method in run
, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi's best practices (still learning the framework). Any advice is appreciated, thanks!
It sounds like you want dynamic requirements. Using the pattern shown in that example, you could read a config or pass a parameter with arbitrary data, and
yield
only the tasks that you want to require based on the fields in the config.(This is executed by simply calling
python tasks.py
)We could easily imagine mapping more than one parameter to more than one task, or applying custom tests before allowing various tasks to execute. We could also rewrite this to take the params from
luigi.Config
.Also note the following control flow from
Task2
:Here we could run an alternative task, or dynamically call tasks as we saw in the example from the
luigi
repo. For example:This may be a bit more involved than a simple
run_standalone()
method, but I think it's the closest thing to what you were looking for in the documented luigi patterns.Source: https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies