I'm getting the above error as I try to create dependencies (subtasks) based on dependency relationship defined in a dictionary ("cmdList). For instance, "BDX010" is a dependency of "BDX020". I'm using Python 3.7.
Please see the stack trace at the bottom for the exact error message.
import luigi
from helpers import SQLTask
import helpers
import logging
import time
acctDate = '201904'
ssisDate = '201905'
runDesc0xx = 'prod period 4 test2'
runDesc9xx = 'test2'
YY = acctDate[:4]
MM = acctDate[4:6]
bdx_sql = 'r:\\1.SQL\\BDX_SQL\\'
cmdList = {
'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql" ','BDX010'),
'BDX022a': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 ','BDX020'),
'BDX022b': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1={YY} MM={MM}','BDX022a'),
'BDX022c': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql" -v Year={YY} Month={MM}', 'BDX022b'),
}
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(BDX_Task, self).__init__(*args, **kwargs)
self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"
def requires(self):
cmdListComb = dict(cmdList)
if self.dependQry != '' and self.dependQry in cmdListComb:
dep_cmd, dep_dep_key = cmdListComb[self.dependQry]
klass = globals()[self.dependQry]
return [klass(
acctDate = self.acctDate,
ssisDate = self.ssisDate,
queryKey = self.dependQry,
queryCmd = dep_cmd,
runDesc = self.runDesc,
dependQry = dep_dep_key
)]
else:
return []
def run(self):
strQuery_and_args = f""" -i {self.queryCmd} """
time.sleep(5)
print(strQuery_and_args)
self.get_target().touch()
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(BDX_Query_0XX, self).__init__(*args, **kwargs)
self.trans_id = "00902_BDX_Query_0XX" + "__" + self.runDesc # static.
def requires(self):
for queryKey, (queryCmd, dependQry) in cmdList.items():
klass = type(queryKey, (BDX_Task,),{})
globals()[queryKey] = klass
yield klass(
acctDate = self.acctDate,
ssisDate = self.ssisDate,
queryKey = queryKey,
queryCmd = queryCmd,
runDesc = self.runDesc,
dependQry = dependQry
)
def run(self):
self.get_target().touch()
class BDX_Query_Main(SQLTask):
acctDate = luigi.Parameter(default=acctDate)
ssisDate = luigi.Parameter(default=ssisDate) # one month lag/later than acctDate
# runDesc = globals().runDesc
trans_id = "09000_Metaclass test" + "__" + runDesc9xx # static.
def requires(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
return [BDX_Query_0XX( acctDate=self.acctDate, ssisDate = self.ssisDate, runDesc = runDesc0xx )
]
def run(self):
self.get_target().touch()
if __name__ == '__main__':
luigi.run()
Stack trace:
DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=prod period 4 test2) is complete
INFO: Informed scheduler that task BDX_Query_Main_201904_201905_444c47aebc has status PENDING
DEBUG: BDX_Task.__init__ called for queryKey ="BDX010"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX020"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022a"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022b"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022c"
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=) is complete
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql" , runDesc=prod period 4 test2, dependQry=BDX010) is complete
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=BDX020) is complete
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql" -S LWVPDBSQLC070 -v Year1=2019 MM=04, runDesc=prod period 4 test2, dependQry=BDX022a) is complete
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql" -v Year=2019 Month=04, runDesc=prod period 4 test2, dependQry=BDX022b) is complete
INFO: Informed scheduler that task BDX_Query_0XX_201904_prod_period_4_te_201905_73ccfa7be3 has status PENDING
INFO: Informed scheduler that task BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_5c6660ab25 has status PENDING
INFO: Informed scheduler that task BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_c0677e7954 has status PENDING
INFO: Informed scheduler that task BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_784cf5b40a has status PENDING
INFO: Informed scheduler that task BDX020_201904_BDX010__r__1_SQL_BDX_SQ_d37e4e46a2 has status PENDING
INFO: Informed scheduler that task BDX010_201904___r__1_SQL_BDX_SQ_9d353a8cd3 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: Worker Worker(salt=751624561, workers=5, host=LWVPWEACT001, username=i805649, pid=4108) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "R:/1.PY/DataPipeLine/run_test.py", line 178, in <module>
luigi.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 192, in run
return _run(*args, **kwargs)['success']
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 209, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 172, in _schedule_and_run
success &= worker.run()
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 1184, in run
self._run_task(get_work_response.task_id)
File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 996, in _run_task
task_process.start()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
**_pickle.PicklingError: Can't pickle <class 'abc.BDX010'>: attribute lookup BDX010 on abc failed**
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.
To solve this, make sure luigi know where to find the code that build the class by manually resetting the
__module__
variable.Change the line to:
As far as I know, this is only a problem on Windows.
Edit: Sorry, stupid of me. Just must also make sure all custom classes gets re-created and added if a new process just imports the module.