How to Dynamically create a Luigi Task

2020-07-10 05:43发布

问题:

I am building a wrapper for Luigi Tasks and I ran into a snag with the Register class that's actually an ABC metaclass and not being pickable when I create a dynamic type.

The following code, more or less, is what I'm using to develop the dynamic class.

class TaskWrapper(object):
    '''Luigi Spark Factory from the provided JobClass

    Args:
        JobClass(ScrubbedClass): The job to wrap
        options: Options as passed into the JobClass
    '''

    def __new__(self, JobClass, **options):
        # Validate we have a good job
        valid_classes = (
            ScrubbedClass01,
            # ScrubbedClass02,
            # ScrubbedClass03,
        )
        if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
            raise TypeError('Job is not the correct class: {}'.format(JobClass))

        # Build a luigi task class dynamically
        luigi_identifier = 'Task'
        job_name = JobClass.__name__
        job_name = job_name.replace('Pail', '')
        if not job_name.endswith(luigi_identifier):
            job_name += luigi_identifier

        LuigiTask = type(job_name, (PySparkTask, ), {})

        for k, v in options.items():
            setattr(LuigiTask, k, luigi.Parameter())

        def main(self, sc, *args):
            job = JobClass(**options)
            return job._run()

        LuigiTask.main = main

        return LuigiTask

When I run my calling function, however, I get PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.

Calling Function:

def create_task(JobClass, **options):
    LuigiTask = TaskWrapper(JobClass, **options)
    # Add parameters
    parameters = {
        d: options.get(d)
        for d in dir(LuigiTask)
        if not d.startswith('_')
        if isinstance(getattr(LuigiTask, d), luigi.Parameter)
        if d in options
    }

    task = LuigiTask(**parameters)
    return task

回答1:

When creating classes dynamically with a meta-class of ABC, the module becomes abc, and when a worker tries to find the task it goes to the abstract base class module and tries to find it there, but of course it does not exist.

To solve this, make sure luigi know where to find the code that build the class by manually resetting the __module__ variable.

Change the line to:

LuigiTask = type(job_name, (PySparkTask, ), {'__module__':__name__})

As far as I know, this is only a problem on Windows.