Today, running the machine learning job I've written is done by hand. I download the needed input files, learn and predict things, output a .csv file, which I then copy into a database.
However, since this is going into production, I need to automate all this process. The needed input files will arrive every month (and eventually more frequently) into a S3 bucket from the provider.
Now I'm planning using Luigi to solve this problem. Here is the ideal process:
- Every week (or day, or hour, whatever I feel is better) I need my program to watch the S3 bucket for new files
- When a file arrives, my machine learning pipeline is fired, and spits some pandas dataframes.
- After that, I need my program to write these results into different DBs
The problem is, I don't know how to use Luigi to automate:
- File watching
- Schedule tasks (e.g. for every month)
- Deploy it (in a reproducible manner)
Today, here is the pipeline skeleton that I have in mind:
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
Then I would add this to crontab and simply wrap into a Docker container.
Questions:
- Is this the correct pattern that people use to do this? Is there a better way to do it?
- If I had
Transform1
(that depends on the input data) andTransform2
(that depends onTransform1
result) and wanted to save both results into different DBs, how could one implement this using a Luigi pipeline (also in this context of watching files)? - Do people use something different than cron for this?
- How to containerize this properly?
Your pattern looks largely correct. I would start by using a cron job to call a script that triggers the
Load
task pipeline. It looks like thisLoad
task already verifies the existence of new files in the S3 bucket, but you would have to change the output to also be conditional, which could be a status file or something else if there is nothing to do. You could also do this in a higher levelWrapperTask
(with no output) that just required theLoad
task only if there were new files. Then you could use thisWrapperTask
to require two different Load tasks and which would respectively require yourTransform1
andTransform2
.Adding in containers... what my cron really calls is a script that pulls my latest code from git, builds a new container if necessary, and then calls docker run. I have another container that is always up running
luigid
. The daily docker run executes a shell script in the container usingCMD
that calls the luigi task with the parameters needed for that day.