Today, running the machine learning job I've written is done by hand. I download the needed input files, learn and predict things, output a .csv file, which I then copy into a database.
However, since this is going into production, I need to automate all this process. The needed input files will arrive every month (and eventually more frequently) into a S3 bucket from the provider.
Now I'm planning using Luigi to solve this problem. Here is the ideal process:
- Every week (or day, or hour, whatever I feel is better) I need my program to watch the S3 bucket for new files
- When a file arrives, my machine learning pipeline is fired, and spits some pandas dataframes.
- After that, I need my program to write these results into different DBs
The problem is, I don't know how to use Luigi to automate:
- File watching
- Schedule tasks (e.g. for every month)
- Deploy it (in a reproducible manner)
Today, here is the pipeline skeleton that I have in mind:
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
Then I would add this to crontab and simply wrap into a Docker container.
Questions:
- Is this the correct pattern that people use to do this? Is there a better way to do it?
- If I had
Transform1
(that depends on the input data) andTransform2
(that depends onTransform1
result) and wanted to save both results into different DBs, how could one implement this using a Luigi pipeline (also in this context of watching files)? - Do people use something different than cron for this?
- How to containerize this properly?