I tried to design a workflow using composite pattern
, the sample codes looks like this:
class CommandInterface(object):
def __init__(self, name, template=None, tool=None, param : dict={},input=None, output=None ):
self.name = name
self.input = input
self.output = output
self.param = param
self.template = template
def before_invoke(self):
pass # check before invoke
def invoke(self):
pass
def after_invoke(self):
pass # check after invoke
class ShellCommand(CommandInterface):
def render(self):
cmd = self.template.format(input=self.input,
output=self.output,
param=self.param,
)
return cmd
def before_invoke(self):
pass
def invoke(self):
os.system(self.render())
class Workflow(CommandInterface):
def __init__(self, name):
self.name = name
self._input = []
self._output = []
self._commands = []
self._tool = []
def add(self, command : CommandInterface):
self._commands.append(command)
return self
def invoke(self):
for command in self._commands:
command.invoke()
And can be used like this:
workflow = Workflow()
raw_files = ["file1", "file2", "file3"]
for i in raw_files:
head_command = ShellCommand("head {input} {output}",
input = i,
output = i+"_head")
workflow.add(head_command)
merge_command = ShellCommand("cat {input} > {output}",
input=" ".join([i+"_head" for i in rawfiles]),
output="merged")
workflow.add(merge_command)
workflow.invoke()
However, sometimes a command's input might be another command's result, for example:
class PythonCommand(CommandInterface):
def invoke(self):
self._result = self.template(input=self.input, output=self.output, param=self.param)
def a_command(input, output, param):
take_long_time(input, output, param)
return {"a_result": "some result will be used in b_command"}
def b_command(input, output, param):
def need_the_result_of_a_command(input, output, param):
return param["a_result"]
need_the_result_of_a_command(input, output, param)
return "success"
a = PythonCommand(a_command, input_, output_, param_)
a.invoke()
b = PythonCommand(b_command, input_, output_, param= a._result)
b.invoke()
I can't use a workflow to composite these 2 command because b
uses a
's result as parameter, which is only visiable after a is invoked. However, a workflow manager like the codes before is still necessary in some situations. I think I need something like this:
workflow = Workflow()
a = PythonCommand(a_command, input_, output_, param_)
workflow.add(a)
b = PythonCommand(b_command, input_, output_, param= {"a_result": a.lazy()._result})
workflow.add(b)
workflow.invoke()
Does anyone have ideas about how to design a workflow with .lazy()
implementation like this?