I have a bucket with new objects getting added at random intervals with keys based on their time of creation. For example:
's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)
In fact, these are the outputs of Scrapy crawls. I want to trigger a task that matches these crawls to a master .csv product catalog file I have (call it "product_catalog.csv"), which also gets updated regularly.
Right now, I have several Python scripts I have written with global variables that I fill in every time I run this process. Those need to become imported attributes.
So here is what needs to happen:
1) New csv file shows up in "s3://my-bucket/mass/..." with a unique key name based on the time the crawl completed. Luigi sees this and begins.
2) "cleaning.py" gets run by luigi on the new file, so the parameter of "cleaning.py" (the file that showed up in S3) needs to be supplied to it at runtime. The results get saved in S3 in addition to being passed on to the next step.
3) The latest version of "product_catalog.csv" is pulled from a database and uses the results of "cleaning.py" in "matching.py"
I realize this may not make complete sense. I will supply edits as necessary to make it all more clear.
EDIT
Based on initial answers, I have configured this to be a pull operation that saves steps along the way. But now I am pretty lost. It should be noted that this is my first time tying a Python project together, so there are things like including init.py that I am learning as I do this. As usual, it is a bumpy road of excitement from successes followed immediately by confusion at the next roadblock.
Here are my questions:
1) How to import the spiders from Scrapy is unclear to me. I have about a dozen of them and the goal is to have luigi manage the process of crawl>clean>match for all of them. The Scrapy documentation says to include:
class MySpider(scrapy.Spider):
# Your spider definition
What does that mean? Re-write the spider in the script controlling the spider? That makes no sense and their examples are not helpful.
2) I have configured Scrapy pipelines to export to S3 but luigi also seems to do this with output(). Which should I use and how do I get them to play together?
3) Luigi says that CrawlTask() ran successfully but that is wrong because it completes in seconds and the crawls usually take a few minutes. There is also no output file corresponding to success.
4) Where do I supply the credentials for S3?
Here is my code. I have commented out things that weren't working in lieu of what I perceive to be better. But my sense is that there is a grand architecture to what I want to do that I just don't understand yet.
import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider
class CrawlTask(luigi.Task):
crawltime = datetime.now()
spider = luigi.Parameter()
#vertical = luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))
def run(self):
os.system("scrapy crawl %s" % self.spider)
#process = CrawlerProcess(get_project_settings())
#process.crawl("%s" % self.spider)
#process.start()
class FetchPC(luigi.Task):
vertical = luigi.Parameter()
def output(self):
if self.vertical == "product1":
return "actual_data_staging/product1_catalog.csv"
elif self.vertical == "product2":
return "actual_data_staging/product2_catalog.csv"
class MatchTask(luigi.Task):
crawltime = CrawlTask.crawltime
vertical = luigi.Parameter()
spider = luigi.Parameter()
def requires(self):
return CrawlTask(spider=self.spider)
return FetchPC(vertical=self.vertical)
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))
def run(self):
if self.vertical == 'product1':
switch_board(requires.CrawlTask(), requires.FetchPC())
The MatchTask refers to a python script I wrote that compares the scraped products to my product catalog. It looks like this:
def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script