When a new file arrives in S3, trigger luigi task

2019-09-15 06:06发布

问题:

I have a bucket with new objects getting added at random intervals with keys based on their time of creation. For example:

's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)

In fact, these are the outputs of Scrapy crawls. I want to trigger a task that matches these crawls to a master .csv product catalog file I have (call it "product_catalog.csv"), which also gets updated regularly.

Right now, I have several Python scripts I have written with global variables that I fill in every time I run this process. Those need to become imported attributes.

So here is what needs to happen:

1) New csv file shows up in "s3://my-bucket/mass/..." with a unique key name based on the time the crawl completed. Luigi sees this and begins.
2) "cleaning.py" gets run by luigi on the new file, so the parameter of "cleaning.py" (the file that showed up in S3) needs to be supplied to it at runtime. The results get saved in S3 in addition to being passed on to the next step.
3) The latest version of "product_catalog.csv" is pulled from a database and uses the results of "cleaning.py" in "matching.py"

I realize this may not make complete sense. I will supply edits as necessary to make it all more clear.

EDIT

Based on initial answers, I have configured this to be a pull operation that saves steps along the way. But now I am pretty lost. It should be noted that this is my first time tying a Python project together, so there are things like including init.py that I am learning as I do this. As usual, it is a bumpy road of excitement from successes followed immediately by confusion at the next roadblock.

Here are my questions:
1) How to import the spiders from Scrapy is unclear to me. I have about a dozen of them and the goal is to have luigi manage the process of crawl>clean>match for all of them. The Scrapy documentation says to include:

class MySpider(scrapy.Spider):
    # Your spider definition

What does that mean? Re-write the spider in the script controlling the spider? That makes no sense and their examples are not helpful.

2) I have configured Scrapy pipelines to export to S3 but luigi also seems to do this with output(). Which should I use and how do I get them to play together?

3) Luigi says that CrawlTask() ran successfully but that is wrong because it completes in seconds and the crawls usually take a few minutes. There is also no output file corresponding to success.

4) Where do I supply the credentials for S3?

Here is my code. I have commented out things that weren't working in lieu of what I perceive to be better. But my sense is that there is a grand architecture to what I want to do that I just don't understand yet.

import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider

class CrawlTask(luigi.Task):
    crawltime = datetime.now()
    spider = luigi.Parameter()
    #vertical = luigi.Parameter()

    def requires(self):
        pass

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))

    def run(self):
        os.system("scrapy crawl %s" % self.spider)
        #process = CrawlerProcess(get_project_settings())
        #process.crawl("%s" % self.spider)
        #process.start()

class FetchPC(luigi.Task):
    vertical = luigi.Parameter()

    def output(self):
        if self.vertical == "product1":
            return "actual_data_staging/product1_catalog.csv"
        elif self.vertical == "product2":
            return "actual_data_staging/product2_catalog.csv"

class MatchTask(luigi.Task):
    crawltime = CrawlTask.crawltime
    vertical = luigi.Parameter()
    spider = luigi.Parameter()

    def requires(self):
        return CrawlTask(spider=self.spider)
        return FetchPC(vertical=self.vertical)

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))

    def run(self):
        if self.vertical == 'product1':
            switch_board(requires.CrawlTask(), requires.FetchPC())

The MatchTask refers to a python script I wrote that compares the scraped products to my product catalog. It looks like this:

def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script

回答1:

Below is a very rough outline of how it could look. I think the main difference from your outline in regards to luigi working as a pull system is that you specify the output you want first, which then triggers the other tasks upon which that output depends. So, rather than naming things with the time the crawl ends, it is easier to name things after something you know at the start. It is possible to do it the other way, just a lot of unnecessary complication.

class CrawlTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        pass

    def get_filename(self):
        return "s3://my-bucket/crawl_{}.csv".format(self.crawltime)

    def output(self):
        return S3Target(self.get_filename())

    def run(self):
        perform_crawl(s3_filename=self.get_filename())


class CleanTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        return CrawlTask(crawltime=self.crawltime)

    def get_filename(self):
        return "s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime)

    def output(self):
        return S3Target(self.get_filename())

    def run(self):
        perform_clean(input_file=self.input().path, output_filename=self.get_filename())


class MatchTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        return CleanTask(crawltime=self.crawltime)

    def output(self):
        return ##?? whatever output of this task is

    def run(self):
        perform_match(input_file=self.input().path)


回答2:

What you could do is create a larger system that encapsulates both your crawls and processing. This way you don't have to check s3 for new objects. I haven't used luigi before, but maybe you can turn your scrapy job into a task, and when it's done do your processing task. Anyway, I don't think 'checking' s3 for new stuff is a good idea because 1. you will have to use lots of API calls, and 2. You will need to write a bunch of code to check if something is 'new' or not, which could get hairy.