Make failure of a dynamic Luigi task non critical

2019-08-22 00:24发布

问题:

I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.

I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads

The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.

The problem is that if any of the download tasks fails then the successful download list is never created.

This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.

Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?

Example code below, GetFiles is the task that we are calling from the command line.

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)

回答1:

I have read the documentation a few times, and I found no indication of such things as non-critical failures. That being said, this behavior could be easily achieved by overriding Task.complete method in DownloadFileFromFtp, while still being able to use DownloadFileFromFtp.output in GetFiles.run.

By overring with return True, the Task DownloadFileFromFtp will succeed regardless of the success of the download.

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

Notice however, that you could also use more complex logic in that complete method - like failing only if the task met a specific network failure at runtime.



标签: luigi