Can't pickle : attribute lookup class_n

2020-04-12 01:06发布

问题:

I'm getting the above error as I try to create dependencies (subtasks) based on dependency relationship defined in a dictionary ("cmdList). For instance, "BDX010" is a dependency of "BDX020". I'm using Python 3.7.

Please see the stack trace at the bottom for the exact error message.

import luigi
from helpers import SQLTask
import helpers
import logging 
import time

acctDate = '201904'
ssisDate = '201905'
runDesc0xx = 'prod period 4 test2'  
runDesc9xx = 'test2'  

YY = acctDate[:4]
MM = acctDate[4:6]


bdx_sql = 'r:\\1.SQL\\BDX_SQL\\'
cmdList = {
        'BDX010': (f'"{bdx_sql}BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 ',''),
        'BDX020': (f'"{bdx_sql}BDX_001_NI_DM 020.sql"  ','BDX010'),
        'BDX022a': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 ','BDX020'),
        'BDX022b': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1={YY} MM={MM}','BDX022a'),
        'BDX022c': (f'"{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year={YY} Month={MM}', 'BDX022b'),
    }

class BDX_Task(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter(default=None)
    queryKey = luigi.Parameter()
    queryCmd = luigi.Parameter()
    runDesc = luigi.Parameter()
    dependQry = luigi.Parameter()


    def __init__(self, *args, **kwargs):
        super(BDX_Task, self).__init__(*args, **kwargs)
        self.trans_id = f"00903_BDX_Query_{self.queryKey}__{self.runDesc}"

    def requires(self):
        cmdListComb = dict(cmdList)

        if self.dependQry != '' and self.dependQry in cmdListComb:
            dep_cmd, dep_dep_key = cmdListComb[self.dependQry]

            klass = globals()[self.dependQry]
            return [klass(         
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = self.dependQry,
                queryCmd = dep_cmd,
                runDesc = self.runDesc,
                dependQry = dep_dep_key
            )]
        else:
            return []

    def run(self):

        strQuery_and_args = f""" -i {self.queryCmd} """
        time.sleep(5)
        print(strQuery_and_args)
        self.get_target().touch()


class BDX_Query_0XX(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter()  
    runDesc = luigi.Parameter()


    def __init__(self, *args, **kwargs):
        super(BDX_Query_0XX, self).__init__(*args, **kwargs)

        self.trans_id =  "00902_BDX_Query_0XX" + "__" + self.runDesc  # static.


    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            klass = type(queryKey, (BDX_Task,),{})
            globals()[queryKey] = klass
            yield klass(
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry
            )

    def run(self):
        self.get_target().touch()



class BDX_Query_Main(SQLTask):
    acctDate = luigi.Parameter(default=acctDate)
    ssisDate = luigi.Parameter(default=ssisDate)  # one month lag/later than acctDate
    # runDesc = globals().runDesc

    trans_id = "09000_Metaclass test" + "__" + runDesc9xx  # static.

    def requires(self):
        YY = self.acctDate[:4]
        MM = self.acctDate[4:6]
        acctDate = self.acctDate
        ssisDate = self.ssisDate

        return [BDX_Query_0XX( acctDate=self.acctDate, ssisDate = self.ssisDate, runDesc = runDesc0xx )
               ]

    def run(self):
        self.get_target().touch()


if __name__ == '__main__':
    luigi.run()

Stack trace:

DEBUG: Checking if BDX_Query_Main(acctDate=201904, ssisDate=201905) is complete
DEBUG: Checking if BDX_Query_0XX(acctDate=201904, ssisDate=201905, runDesc=prod period 4 test2) is complete
INFO: Informed scheduler that task   BDX_Query_Main_201904_201905_444c47aebc   has status   PENDING
DEBUG: BDX_Task.__init__ called for queryKey ="BDX010"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX020"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022a"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022b"
DEBUG: BDX_Task.__init__ called for queryKey ="BDX022c"
DEBUG: Checking if BDX010(acctDate=201904, ssisDate=201905, queryKey=BDX010, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 010.sql" -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=) is complete
DEBUG: Checking if BDX020(acctDate=201904, ssisDate=201905, queryKey=BDX020, queryCmd="r:\1.SQL\BDX_SQL\BDX_001_NI_DM 020.sql"  , runDesc=prod period 4 test2, dependQry=BDX010) is complete
DEBUG: Checking if BDX022a(acctDate=201904, ssisDate=201905, queryKey=BDX022a, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 010.sql"  -S LWVPDBSQLC070 , runDesc=prod period 4 test2, dependQry=BDX020) is complete
DEBUG: Checking if BDX022b(acctDate=201904, ssisDate=201905, queryKey=BDX022b, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 020.sql"  -S LWVPDBSQLC070  -v Year1=2019 MM=04, runDesc=prod period 4 test2, dependQry=BDX022a) is complete
DEBUG: Checking if BDX022c(acctDate=201904, ssisDate=201905, queryKey=BDX022c, queryCmd="r:\1.SQL\BDX_SQL\BDX_022_P038_All_Final_CatAdj 030.sql"  -v Year=2019 Month=04, runDesc=prod period 4 test2, dependQry=BDX022b) is complete
INFO: Informed scheduler that task   BDX_Query_0XX_201904_prod_period_4_te_201905_73ccfa7be3   has status   PENDING
INFO: Informed scheduler that task   BDX022c_201904_BDX022b__r__1_SQL_BDX_SQ_5c6660ab25   has status   PENDING
INFO: Informed scheduler that task   BDX022b_201904_BDX022a__r__1_SQL_BDX_SQ_c0677e7954   has status   PENDING
INFO: Informed scheduler that task   BDX022a_201904_BDX020__r__1_SQL_BDX_SQ_784cf5b40a   has status   PENDING
INFO: Informed scheduler that task   BDX020_201904_BDX010__r__1_SQL_BDX_SQ_d37e4e46a2   has status   PENDING
INFO: Informed scheduler that task   BDX010_201904___r__1_SQL_BDX_SQ_9d353a8cd3   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 5 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 7
INFO: Worker Worker(salt=751624561, workers=5, host=LWVPWEACT001, username=i805649, pid=4108) was stopped. Shutting down Keep-Alive thread
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1664, in <module>
    main()
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "R:/1.PY/DataPipeLine/run_test.py", line 178, in <module>
    luigi.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 192, in run
    return _run(*args, **kwargs)['success']
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 209, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\interface.py", line 172, in _schedule_and_run
    success &= worker.run()
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 1184, in run
    self._run_task(get_work_response.task_id)
  File "C:\ProgramData\Anaconda3\lib\site-packages\luigi\worker.py", line 996, in _run_task
    task_process.start()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
**_pickle.PicklingError: Can't pickle <class 'abc.BDX010'>: attribute lookup BDX010 on abc failed**

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

回答1:

When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.

To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__ variable.

Change the line to:

klass = type(queryKey, (BDX_Task,),{'__module__':__name__})

As far as I know, this is only a problem on Windows.

Edit: Sorry, stupid of me. Just must also make sure all custom classes gets re-created and added if a new process just imports the module.

# Run this first outside any other logic so it gets run if someone imports the module:
for queryKey in cmdList.keys():
    globals()[queryKey] = type(queryKey,(BDX_Task,){'__module__':__name__})

#Then you requires function can look like:
class BDX_Query_0XX(SQLTask):

    # ...

    def requires(self):
        for queryKey, (queryCmd, dependQry) in cmdList.items():
            yield globals()[queryKey](
                acctDate = self.acctDate,
                ssisDate = self.ssisDate,
                queryKey = queryKey,
                queryCmd = queryCmd,
                runDesc = self.runDesc,  
                dependQry = dependQry
            )


标签: python luigi