I am using luigi to execute a chain of tasks, like so:
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('test.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
def requires(self):
return Task1(stuff=self.stuff)
def output(self):
return luigi.LocalTarget('something-else.json')
def run(self):
with self.output().open('w') as f:
f.write(stuff)
This works exactly as desired when I start the entire workflow like so:
luigi.build([Task2(stuff='stuff')])
When using luigi.build
you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.
However, in my situation, I would also like to be able to run the business logic of Task2
completely independently of it's involvement in the workflow. This works fine for tasks that do not implement requires
, as per this example.
My question is, how can I run this method both as part of the workflow, as well as on it's own? Obviously, I could just add a new private method like _my_custom_run
, which takes the data and returns the result, and then use this method in run
, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi's best practices (still learning the framework). Any advice is appreciated, thanks!
It sounds like you want dynamic requirements. Using the pattern shown in that example, you could read a config or pass a parameter with arbitrary data, and yield
only the tasks that you want to require based on the fields in the config.
# tasks.py
import luigi
import json
import time
class Parameterizer(luigi.Task):
params = luigi.Parameter() # Arbitrary JSON
def output(self):
return luigi.LocalTarget('./config.json')
def run(self):
with self.output().open('w') as f:
json.dump(params, f)
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[:6]))
def run(self):
with self.output().open('w') as f:
f.write(self.stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
params = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[6:]))
def run(self):
config = Parameterizer(params=self.params)
yield config
with config.output().open() as f:
parameters = json.load(f)
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
with self.output().open('w') as f:
f.write(self.stuff)
if __name__ == '__main__':
cf_json = '{"runTask1": True}'
print("Trying to run with Task1...")
luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)
time.sleep(10)
cf_json = '{"runTask1": False}'
print("Trying to run WITHOUT Task1...")
luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)
(This is executed by simply calling python tasks.py
)
We could easily imagine mapping more than one parameter to more than one task, or applying custom tests before allowing various tasks to execute. We could also rewrite this to take the params from luigi.Config
.
Also note the following control flow from Task2
:
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
Here we could run an alternative task, or dynamically call tasks as we saw in the example from the luigi
repo. For example:
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
# self.stuff is not automatically parsed to int, so this list comp is valid
data_dependent_deps = [Task1(stuff=x) for x in self.stuff]
yield data_dependent_deps
This may be a bit more involved than a simple run_standalone()
method, but I think it's the closest thing to what you were looking for in the documented luigi patterns.
Source: https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies