Is Scrapy's asynchronicity what is hindering m

2019-08-10 12:15发布

问题:

My scrapy project "drills down" from list pages, retrieving data for the listed items at varying levels up to several deep. There could be many pages of listed items with handfuls of different items/links on each page. I'm collecting details (and storing them in a single CSV Excel file) of each of the items from: the page it is listed on, the page link in that list ("more details" page), and yet another page - the original listing by the item's manufacturer, let's say.

Because I am building a CSV file, it would be VERY helpful to put each item's data on a single line before my parse process moves along to the next item. I could do it nicely if only I could get a Request to launch when I demand it while I am writing the CSV line for that item on the list page it appears on. I would just "drill down" as many levels as I need with a different parse function for each level, if needed, staying with a single item all the way until I have the entire CSV file line that it will need.

Instead of it being that easy, it appears that I am going to have to re-write the CSV file for EVERY ITEM at EVERY LEVEL because I can't get scrapy to give me the items' "more details" links responses until I've exited the entire parse function of the page of items listing, thus the end of my CSV file is no longer at the item being processed, and I'm having to have a unique field on each line to look each item up at each level, re-write the file, etc.

Understand, I can't know which callback level will be the last one for any particular item. That is determined on an item-by-item basis. Some items won't even have "deeper" levels. My only idea left is to have only a single recursive callback function that handles all callback levels, but is that way this kind of thing is done by the rest of you, or does scrapy have some means of "Request and wait for response" or something similar? I'm not wanting to install a sql database on my laptop, never having set one up before.

Thank you!!!

from scrapy.spider import Spider
from scrapy.selector import Selector
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.contrib.exporter import CsvItemExporter
import csv
from meow.items import meowItem, meowPage
from scrapy.http import Request
import os
from mmap import mmap

class meowlistpage(Spider):
    name="melist"
    prefixhref='http://www.meow.com'
    #add '2_p/', '3_p/', or '4_p/', etc. to get to meow's other pages
    start_urls = [prefixhref+"/homes/for_sale/CO/house,mobile,land_type/10_rid/3000-30000_price/11-117_mp/800000-8000000_lot/lot_sort/46.377254,-96.82251,30.845647,-114.312744_rect/5_zm/1_p/1_rs/"]
    print 'Retrieving first page...'
    def parse(self, response):
        print 'First page retrieved'
        name="melist";prefixhref='http://www.meow.com';
        csvfilename = 'C:\\Python27\\My scripts\\meow\\'+name+'.csv';csvfile = open(csvfilename, 'w');pass;csvfile.close()
        hxs = Selector(response)
        page_tags=hxs.xpath("//div[@id='search-results']/article")
        for page_tags in page_tags:
            item = meowItem()
            item['ad_link']=prefixhref+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@href").extract())[3:-2]
            idendplace=str(item['ad_link']).index('_zpid')-12; backhashstr=str(item['ad_link'])[idendplace:]; 
            idstartplace=backhashstr.index('/')+1; idendplace=len(backhashstr)-backhashstr.index('_zpid');
            item['zpid']=str(backhashstr)[idstartplace:-idendplace]
            item['sale_sold']=str(page_tags.xpath(".//div[1]/dl[1]/dt[1]/@class").extract())[8:-17]#"recentlySold" or "forSale"
            item['prop_price']=str(page_tags.xpath(".//div[1]/dl[1]/dt[2]/strong/text()").extract())[3:-2]
            if (str(item['sale_sold'])=='recentlySold'):item['prop_price']=str(item['prop_price'])+str(page_tags.xpath(".//div[1]/dl[1]/dt[1]/strong/text()").extract())[3:-2]
            try:
                dollrsgn=item['prop_price'].index('$');item['prop_price']=str(item['prop_price'])[dollrsgn:]
            except:pass
            item['ad_title']=str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract())[3:-2]
            prop_latitude1=page_tags.xpath("@latitude").extract();item['prop_latitude']=str(prop_latitude1)[3:-8]+'.'+str(prop_latitude1)[5:-2]
            prop_longitude1=page_tags.xpath("@longitude").extract();item['prop_longitude']=str(prop_longitude1)[3:-8]+'.'+str(prop_longitude1)[7:-2]
            item['prop_address']=str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[1]/text()").extract())[3:-2]+', '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[2]/text()").extract())[3:-2]+', '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[3]/text()").extract())[3:-2]+'  '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[4]/text()").extract())[3:-2]
            mightmentionacres = str(page_tags.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]+' | '+str(page_tags.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]+' | '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract())[3:-2]+' | '#+str()[3:-2]#this last segment comes from full ad
            item['prop_acres'] = mightmentionacres

            #Here is where I'm talking about

            yield Request(str(item['ad_link']), meta={'csvfilename':csvfilename, 'item':item}, dont_filter=True, callback = self.getthispage)

            #By this point, I wanted all the callback[s] to have had executed, but they don't - Scrapy waits to launch them until after this function completes

            csvfile = open(csvfilename, 'ab')
            outwriter = csv.writer(csvfile, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            outwriter.writerow(item['zpid'], [item['sale_sold'], item['prop_price'], item['ad_title'],
                                             item['prop_address'], item['prop_latitude'],
                                              item['prop_longitude'], item['prop_acres'],
                                               item['ad_link'], item['parcelnum'], item['lot_width']])
            csvfile.close()
        #retrieve href of next page of ads
        next_results_pg=1
        page_tags=hxs.xpath("//div[@id='list-container']/div[@id='search-pagination-wrapper-2']/ul[1]")
        while (str(page_tags.xpath(".//li["+str(next_results_pg)+"]/@class").extract())[3:-2]!='current'):
            next_results_pg+=1;
            if (next_results_pg>80):
                break
        next_results_pg+=1#;item['next_results_pg'] = next_results_pg
        if (str(page_tags.xpath(".//li["+str(next_results_pg)+"]/@class").extract())[3:-2]=='next'):return
        next_results_pg_href = prefixhref+str(page_tags.xpath(".//li["+str(next_results_pg)+"]/a/@href").extract())[3:-2]#
        if (next_results_pg_href != prefixhref):#need to also avoid launching pages otherwise not desired
            page = meowPage()
            page['next_results_pg_href'] = next_results_pg_href
            print 'Retrieving page '+ next_results_pg_href
#           yield Request(next_results_pg_href, dont_filter=True, callback = self.parse)
        return
#       if (item['next_results_pg_href']==prefixhref):
#           print 'No results pages found after this one, next+results_pg='+str(next_results_pg)
#       else:
#           print 'Next page to parse after this one is '+str(item['next_results_pg_href'])

    def getthispage(self, response): 
        #Even though the yield statement was used, 
        #nothing here really gets executed until
        #until the first parse function resumes and 
        #then finishes completely.
        return 

回答1:

I've rearranged your spider code a bit to make the "item in meta" a bit clearer (I hope)

from scrapy.spider import Spider
from scrapy.selector import Selector

from meow.items import meowItem, meowPage
from scrapy.http import Request
import urlparse
import pprint

class meowlistpage(Spider):
    name="melist"
    allowed_domains = ['meow.com']

    start_urls = ['http://www.meow.com'
                  '/homes/for_sale/CO'
                  '/10_rid/42.382894,-101.931152,35.496456,-109.171143_rect/6_zm/']

    def parse(self, response):
        self.log('First page retrieved')
        selector = Selector(response)

        # loop over the subroutine results, yielding each;
        # could be an Item or a Request
        for result in self.parse_page_articles(response, selector):
            yield result

        # look for next pages (except the one marked as "current")
        for next_page in selector.xpath("""
                //div[@id="search-pagination-wrapper-2"]
                    /ul/li[not(@class="current")]/a/@href""").extract():
            yield Request(urlparse.urljoin(response.url, next_page))

    def getthispage(self, response):
        selector = Selector(response)
        item = response.meta["item"]
        # self.log("in %s (getthispage:\nitem to complete \n%r" % (
        #    response.url, pprint.pformat(item)))
        #
        # continue extracting data from this page and store it in item
        # item["some_other_field"] = "info that was missing in listing page"
        # and when done, do:
        yield item

    def parse_page_articles(self, response, selector):
        page_tags = selector.xpath("//div[@id='search-results']/article")
        for article in page_tags:
            item = meowItem()
            item['ad_link'] = urlparse.urljoin(response.url,
                article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@href").extract()[0]).encode('utf-8')

            idendplace = item['ad_link'].index('_zpid')-12
            backhashstr = item['ad_link'][idendplace:]
            idstartplace = backhashstr.index('/') + 1
            idendplace = len(backhashstr)-backhashstr.index('_zpid')

            item['zpid'] = backhashstr[idstartplace:-idendplace]
            item['sale_sold'] = article.xpath(".//div[1]/dl[1]/dt[1]/@class").extract()[0][8:-17]#"recentlySold" or "forSale"
            item['prop_price'] = article.xpath(".//div[1]/dl[1]/dt[2]/strong/text()").extract()[0][3:-2]
            if (item['sale_sold'] == 'recentlySold'):
                item['prop_price'] = item['prop_price'] + article.xpath(".//div[1]/dl[1]/dt[1]/strong/text()").extract()[0][3:-2]
            try:
                dollrsgn = item['prop_price'].index('$')
                item['prop_price'] = item['prop_price'][dollrsgn:]
            except:
                pass

            item['ad_title'] = article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract()[0][3:-2]

            prop_latitude1 = article.xpath("@latitude").extract()
            item['prop_latitude'] = (str(prop_latitude1)[3:-8]
                + '.' + str(prop_latitude1)[5:-2])
            prop_longitude1 = article.xpath("@longitude").extract()
            item['prop_longitude'] = (
                str(prop_longitude1)[3:-8]
                + '.' + str(prop_longitude1)[7:-2])

            item['prop_address'] = (
                str(article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[1]/text()").extract())[3:-2]
                + ', ' + str(article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[2]/text()").extract())[3:-2]
                + ', ' + str(article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[3]/text()").extract())[3:-2]
                + '  ' + str(article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[4]/text()").extract())[3:-2])

            mightmentionacres = (
                str(article.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]
                +' | '
                +str(article.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]
                +' | '
                +str(article.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract())[3:-2]
                +' | '
                #+str()[3:-2]#this last segment comes from full ad
                )
            item['prop_acres'] = mightmentionacres

            yield Request(item['ad_link'], meta={'item':item},
                dont_filter=True, callback = self.getthispage)

Invoking your spider with scrapy crawl melist -o melist_items.csv -t csv should give you your items in CSV format



回答2:

My solution using the standard SQLite3 that is packaged with Python 2.7:

#  items.py contents:
#from scrapy.item import Item, Field

#class TrackItemScrapeItem(Item):
    # define the fields for your item here like:
#    f_1 = Field()
#    f_2 = Field()
#    sale_sold = Field()
#    price = Field()
#    item_ad_link = Field()
#    .and so on
#    .
#    .
#    .
#    <EOF>

#  TrackItemScrapespider.py contents:
from scrapy.spider import Spider
from scrapy.selector import Selector
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.contrib.exporter import CsvItemExporter
import csv
import sys
from zillow.items import TrackItemScrapeItem
from scrapy.http import Request
import os
import sqlite3 #allows scrapy asynchronous processes a global storage place for each item's scraped info
import time

class TrackItemScraper(Spider):
    name="buyitemslist"
    start_urls = ['http://www.buythisandthat.com']
    tablecolumns=""
    prikeyfldname='f_1'
    for field in getattr(TrackItemScrapeItem, 'fields'):
        # Just realize the order of these fields in the database has no relation to the order in items.py
        # nor is it consistent each time
        if (field==prikeyfldname):
            tablecolumns+=str(field)+' TEXT PRIMARY KEY NOT NULL, '
        else:
            tablecolumns+=str(field)+' TEXT, '
    tablecolumns=tablecolumns[:-2]
    con=None
    con=sqlite3.connect(name+".db");tablename='CrawlResults'
    if (con==None):
        print "SQL database not getting opened by sqlite3 !  (Is there room for the file ?)";sys.exit(1)
    cur=con.cursor()
    try:
        cur.execute('select * from '+tablename)
        cur.execute('PRAGMA table_info('+tablename+')')
        data = cur.fetchall()
        for d in data:
            print d[0], d[1], d[2]
        cur.execute('select * from '+tablename)
        print '\n'+str(cur.fetchall())
    except:
        cur.execute('DROP TABLE IF EXISTS '+tablename);cur.execute('CREATE TABLE '+tablename+' ('+tablecolumns+')')
    if (raw_input('\n\n Do you want to delete the previous '+name+'.CSV file?').capitalize()=='Y'):
        csvfile=name+'.csv'
        with open(csvfile, 'w') as csv_file:
            csv_file.close()
    if (raw_input('\n\n Do you want to save the results from the previous run to a new '+name+'.CSV file?').capitalize()=='Y'):
        csvfile=name+'.csv'
        with open(csvfile, 'w') as csv_file:
            csv_writer=csv.writer(csv_file)
            csv_writer.writerow([i[0] for i in cur.description])
            cur.execute('select * from '+tablename)
            csv_writer.writerows(cur)
            csv_file.close()
    if (raw_input('\n\n Do you want to clear out previous results from memory now to start clean?  Answer no ONLY if you haven\'t added new fields!').capitalize()=='Y'):
        cur.execute('DROP TABLE IF EXISTS '+tablename);cur.execute('CREATE TABLE '+tablename+' ('+tablecolumns+')')
    instancesrunning=1#start at 1 because a yield is about to happen implicitly for an asynchronous instance
#    seccntdwn=25#for failsafe counter, can used with next-to-last (for example) parse instance to tolerate an instance collision decrementing instancesrunning counter

    def parse(self, response):#recursive but first entry won't have meta args
        try:
            pageschema = response.meta['pageschema']
        except:#entered function without meta args, pageschema 0
            pageschema = 0
        hxs = Selector(response)
        if (pageschema==0):#top level pages
            ad_list=hxs.xpath("//xpath_to_ad_list")
        #    page_tags=''

            for item_ad in ad_list:
                item = TrackItemScrapeItem()
                # parse code for prikeyfldname field must be in here instead of these comment lines
                #item[prikeyfldname]=item_ad.xpath(".//div[whatever...
                # for this example, item['sale_sold'] and item['price'] will need parsing code in here as well
                con=None
                con=sqlite3.connect(self.name+".db")
                with sqlite3.connect(self.name+".db") as con:
                    cur=con.cursor()
                    replacevar = False
                    try:
                        cmd='INSERT INTO ' +str(self.tablename)+ ' (' +str(self.prikeyfldname)+ ') VALUES (\'' +str(item[self.prikeyfldname])+ '\')'
                        cur.execute(cmd)
                        print str(cmd) #won't see this unless insert succeeded 
                        con.commit()
                        #replacevar is for any fields for this item that you might want to keep old contents of, appending current info only when this item was not new in this run
                        replacevar = True
                        self.storthis(str(item[self.prikeyfldname]), 'sale_sold', str(item['sale_sold']), replace=replacevar)
                    except:#option example: if wanting to save old price, moves it into sale_sold if sale_sold field has changed
                        cmd='UPDATE ' +self.tablename+ ' SET sale_sold = \''  +str(item['sale_sold'])+ ', last retrieved advertised price was \' || (SELECT price) WHERE ' +str(self.prikeyfldname)+ ' = ' +str(item[self.prikeyfldname])+ ' AND sale_sold <> \'' +str(item['sale_sold'])+ '\''
                        print str(cmd)#prints even if SQL update fails for troubleshooting
                        cur.execute(cmd)
                        con.commit()
                #now storthis code for price field because we already parsed it
                self.storthis( str(item[self.prikeyfldname]),'price',item['price'], replace=True)
                #
                #remainder of parses and stores in here for this item, saving the ad_link for next yield/Request
                #
                self.instancesrunning+=1    
                yield Request(str(item['ad_link']), meta={'item':item, 'pageschema':1}, dont_filter=True, callback = self.parse)
            #code in here to determine link for next page of list of ads
            if (final_page_of_list_is_complete):
                self.savetofileiflastinstance()#the way to exit all callback instances
                return
            #parse link to next page in here if not done already
            time.sleep(6)#being nice to host
            self.instancesrunning+=1
            yield Request(next_results_pg_href, dont_filter=True, callback = self.parse)#don't need meta for list pages
        elif (pageschema==1): #we are to parse using 2nd schema
            item = response.meta['item']
            page_tags=hxs.xpath("//as you need for this schema"
            #parse and store in here, just remember in this example that we didn't pass along in meta whether this item is new or not this run
            for (every link on this page needing scraped but adjust pageschema for each type):
                self.instancesrunning+=1    
                yield Request(str(item['ad_link']), meta={'item':item, 'pageschema':as_needed}, dont_filter=True, callback = self.parse)
        elif (pageschema==2): #same general code in here as for schema 1
        elif (pageschema==3): #same general code in here as for schema 1
        elif (pageschema==4): #same general code in here as for schema 1
        self.savetofileiflastinstance()
        return

    def storthis (self, uniquefieldvalue, storfieldname, storfieldvalue, replace):
#       check for nulls etc in here, if desired  
        con=None
        con=sqlite3.connect(self.name+".db")
        if (replace==False):
            cmd='UPDATE '+str(self.tablename)+' SET '+storfieldname+' = (SELECT '+storfieldname+') || \''+storfieldvalue+'\' WHERE '+self.prikeyfldname+'=\''+uniquefieldvalue+'\''
        else:
            cmd='UPDATE '+str(self.tablename)+' SET '+storfieldname+'=\''+storfieldvalue+'\' where '+self.prikeyfldname+'=\''+uniquefieldvalue+'\''
        print str(cmd)
        try:
            with con:
                cur=con.cursor()
                cur.execute(cmd)
                con.commit()
        except:pass##we don't want to return with an error unexcepted
        return 

    def savetofileiflastinstance(self):
#        instancesrunningpre=self.instancesrunning #if utilizing the failsafe instance counting
        self.instancesrunning-=1
        if (self.instancesrunning>0):pass #or failsafe counter code in here
#            #if concerned that this might still be the last if an instance collision ever occurred while decrementing self.instancesrunning
#             we'll wait seccntdwn seconds for another instance to have its presence indicated by a change in the counter
#            if ():
                self.savetofileiflastinstance()
        else:   #this was last instance, write the csv file
            con=None
            con=sqlite3.connect(self.name+".db")
            cur=con.cursor()
            csvfile=self.name+'.csv'
            with open(csvfile, 'a') as csv_file:
                cur.execute('select * from '+self.tablename)
                csv_writer=csv.writer(csv_file)
                csv_writer.writerow([i[0] for i in cur.description])
                csv_writer.writerows(cur)
                csv_file.close()
            print '\n\nWrote the results to the '+self.name+'.CSV file.  Make sure this is the LAST line of output from this sript!  If it isn\'t, rewrite the source coding for instance tracking.\n\n'
            con.commit()
        return