My initial files are in AWS S3
. Could someone point me how I need to setup this in a Luigi Task
?
I reviewed the documentation and found luigi.S3
but is not clear for me what to do with that, then I searched in the web and only get links from mortar-luigi
and implementation in top of luigi.
UPDATE
After following the example provided for @matagus (I created the ~/.boto
file as suggested too):
# coding: utf-8
import luigi
from luigi.s3 import S3Target, S3Client
class MyS3File(luigi.ExternalTask):
def output(self):
return S3Target('s3://my-bucket/19170205.txt')
class ProcessS3File(luigi.Task):
def requieres(self):
return MyS3File()
def output(self):
return luigi.LocalTarget('/tmp/resultado.txt')
def run(self):
result = None
for input in self.input():
print("Doing something ...")
with input.open('r') as f:
for line in f:
result = 'This is a line'
if result:
out_file = self.output().open('w')
out_file.write(result)
When I execute it nothing happens
DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task ProcessS3File() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessS3File() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread
As you can see, the message Doing something...
never prints. What is wrong?
The key here is to define an External Task that has no inputs and which outputs are those files you already have in living in S3. Luigi docs mention this in Requiring another Task:
So, basically you end up with something like this:
UPDATE:
Luigi uses boto to read files from and/or write them to AWS S3, so in order to make this code work, you'll need to provide your credentials in your boto config file
~/boto
(look for other possible config file locations here):