I have a simple pipeline.
I want to start it once with the Id 2381, then while the first job is running I want to start a second run with the Id 231. The first run completes as expected.
The second run returns this response
Pid(s) set([10362]) already running
Process finished with exit code 0
I am starting the runs like this
run one:
luigi.run(
cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
main_task_cls=TaskTwo()
)
run two:
luigi.run(
cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
main_task_cls=TaskTwo()
)
The tasks each have a unique ID as generated by luigi's task_id_str(...) method. Why does luigi think that the task is already running when the luigi.paramater, TaskTwo-id and MockTarget files are all different?
Pipeline code:
import time
import uuid
from luigi.mock import MockTarget
import luigi
class TaskOne(luigi.Task):
run_id = luigi.Parameter()
def output(self):
return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
class TaskTwo(luigi.Task):
id = luigi.Parameter(default=uuid.uuid4().__str__())
def output(self):
return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)
def requires(self):
return TaskOne(self.id)
def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()
It looks like this might be because you are not connecting to a scheduler server, so it is trying to start a scheduler process twice. Are you running luigid?
I was able to get your code to run at the command line as follows. First I created a dir and dropped your code in a file called luigitest.py (minus the luigi.run() commands). I changed directory into the directory I created. Then I ran:
Then I opened up a second terminal in the same directory. In the first one I ran:
In the second one I ran (about a second later):
These both output "Hello World!"