I am building a wrapper for Luigi Tasks and I ran into a snag with the Register
class that's actually an ABC metaclass and not being pickable when I create a dynamic type
.
The following code, more or less, is what I'm using to develop the dynamic class.
class TaskWrapper(object):
'''Luigi Spark Factory from the provided JobClass
Args:
JobClass(ScrubbedClass): The job to wrap
options: Options as passed into the JobClass
'''
def __new__(self, JobClass, **options):
# Validate we have a good job
valid_classes = (
ScrubbedClass01,
# ScrubbedClass02,
# ScrubbedClass03,
)
if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
raise TypeError('Job is not the correct class: {}'.format(JobClass))
# Build a luigi task class dynamically
luigi_identifier = 'Task'
job_name = JobClass.__name__
job_name = job_name.replace('Pail', '')
if not job_name.endswith(luigi_identifier):
job_name += luigi_identifier
LuigiTask = type(job_name, (PySparkTask, ), {})
for k, v in options.items():
setattr(LuigiTask, k, luigi.Parameter())
def main(self, sc, *args):
job = JobClass(**options)
return job._run()
LuigiTask.main = main
return LuigiTask
When I run my calling function, however, I get PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed
.
Calling Function:
def create_task(JobClass, **options):
LuigiTask = TaskWrapper(JobClass, **options)
# Add parameters
parameters = {
d: options.get(d)
for d in dir(LuigiTask)
if not d.startswith('_')
if isinstance(getattr(LuigiTask, d), luigi.Parameter)
if d in options
}
task = LuigiTask(**parameters)
return task