Recurrent machine learning ETL using Luigi

2019-08-07 14:36发布

问题:

Today, running the machine learning job I've written is done by hand. I download the needed input files, learn and predict things, output a .csv file, which I then copy into a database.

However, since this is going into production, I need to automate all this process. The needed input files will arrive every month (and eventually more frequently) into a S3 bucket from the provider.

Now I'm planning using Luigi to solve this problem. Here is the ideal process:

  • Every week (or day, or hour, whatever I feel is better) I need my program to watch the S3 bucket for new files
  • When a file arrives, my machine learning pipeline is fired, and spits some pandas dataframes.
  • After that, I need my program to write these results into different DBs

The problem is, I don't know how to use Luigi to automate:

  1. File watching
  2. Schedule tasks (e.g. for every month)
  3. Deploy it (in a reproducible manner)

Today, here is the pipeline skeleton that I have in mind:

import luigi

from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done

class Extract(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        pass
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
    def run(self):
        data = read_s3(s3_path + '/' + file)
        with self.output.open('w') as hdfs_file:
            write_hdfs(hdfs_file, data)


class Transform(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        return Extract(self.date, self.s3_path, self.filename)
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
    def run(self):
        with self.input().open('r') as inputfile:
            data = read_hdfs(inputfile)
        result = ml_algorithm(data)
        with self.output().open('w') as outputfile:
            write_hdfs(outputfile, result)
        mark_as_done(filename)



class Load(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    def requires(self):
        return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
    def output(self):
        # Fake DB target, just for illustrative purpose
        luigi.hdfs.DBTarget('...')
    def run(self):
        for input in self.input():
            with input.open('r') as inputfile:
                result = read_hdfs(inputfile)
            # again, just for didatic purposes
            db = self.output().connection
            write_db(db, result)

Then I would add this to crontab and simply wrap into a Docker container.

Questions:

  • Is this the correct pattern that people use to do this? Is there a better way to do it?
  • If I had Transform1 (that depends on the input data) and Transform2 (that depends on Transform1 result) and wanted to save both results into different DBs, how could one implement this using a Luigi pipeline (also in this context of watching files)?
  • Do people use something different than cron for this?
  • How to containerize this properly?

回答1:

Your pattern looks largely correct. I would start by using a cron job to call a script that triggers the Load task pipeline. It looks like this Load task already verifies the existence of new files in the S3 bucket, but you would have to change the output to also be conditional, which could be a status file or something else if there is nothing to do. You could also do this in a higher level WrapperTask (with no output) that just required the Load task only if there were new files. Then you could use this WrapperTask to require two different Load tasks and which would respectively require your Transform1 and Transform2.

Adding in containers... what my cron really calls is a script that pulls my latest code from git, builds a new container if necessary, and then calls docker run. I have another container that is always up running luigid. The daily docker run executes a shell script in the container using CMD that calls the luigi task with the parameters needed for that day.