Scraping concurrently with selenium in python

2019-05-14 10:58发布

问题:

I am trying to scrape concurrently with selenium and multiprocessing modules. Below is roughly my approach:

  • create queue with number of webdriver instances equal to number of workers
  • create pool of workers
  • each worker pulls webdriver instance from the queue
  • when function terminates webdriver instance is put back on the queue

Here is the code:

#!/usr/bin/env python
# encoding: utf-8

import time
import codecs
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from multiprocessing import Pool
from Queue import Queue


def download_and_save(link_tuple):
    link_id, link = link_tuple
    print link_id
    w = q.get()
    w.get(link)
    with codecs.open('%s.html' % link_id, 'w', encoding='utf-8') as f:
        f.write(w.page_source)
    time.sleep(10)
    q.put(w)


def main(num_processes):
    links = [
        'http://openjurist.org/743/f2d/273/zwiener-v-commissioner-of-internal-revenue',
        'http://www.oyez.org/advocates/z/l/lonny_f_zwiener',
        'http://www.texasbar.com/attorneys/member.cfm?id=21191',
        'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/cited-by',
        'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/authorities/',
        'http://www.myheritage.com/names/lonny_zwiene',
        'https://law.resource.org/pub/us/case/reporter/F2/743/743.F2d.273.84-4068.htm',
        'http://www.ancestry.com/1940-census/usa/Texas/Lonny-F-Zwiener_5bbff',
        'http://search.ancestry.com/cgi-bin/sse.dll?gl=34&rank=1&new=1&so=3&MSAV=0&msT=1&gss=ms_f-34&gl=bmd_death&rank=1&new=1&so=1&MSAV=0&msT=1&gss=ms_f-2_s&gsfn=Lonny&gsln=Zwiener&msypn__ftp=T',
        'http://www.mocavo.com/Lonny-F-Zwiener-Fredericksburg-Gillespie-Texas-1940-United-States-Census/0798164756456805432',
        'http://www.taftlaw.com/attorneys/635-mark-s-yuric'
    ]
    n = len(links)
    link_tuples = [(link_id, link) for link_id, link in zip(xrange(n), links)]
    pool = Pool(num_processes)
    pool.map(download_and_save, link_tuples)


if __name__ == '__main__':
    num_processes = 2
    q = Queue()
    dcap = dict(DesiredCapabilities.PHANTOMJS)
    dcap["phantomjs.page.settings.userAgent"] = (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36"
    )
    for i in range(num_processes):
        w = webdriver.PhantomJS(desired_capabilities=dcap)
        q.put(w)
    main(num_processes)

This scripts runs but saved htmls are either duplicated or missing.

回答1:

Here is a different approach that I've had success with: you keep your workers in __main__, and the workers pull from the task_q.

import multiprocessing
import traceback

class scrapeWorker(multiprocessing.Process):
    def __init__(self, worker_num, task_q, result_q):
        super(scrapeWorker, self).__init__()
        self.worker_num = worker_num
        self.task_q = task_q
        self.result_q = result_q

        self.scraper = my_scraper_class() # this contains driver code, methods, etc.

    def handleWork(self, work):
        assert isinstance(work, tuple) or isinstance(work, list), "work should be a tuple or list. found {}".format(type(work))
        assert len(work) == 2, "len(work) != 2. found {}".format(work)
        assert isinstance(work[1], dict), "work[1] should be a dict. found {}".format(type(work[1]))

        # do the work
        result = getattr( self.scraper, work[0] )( **work[1] )

        self.result_q.put( result )

    # worker.run() is actually called via worker.start()
    def run(self):
        try:
            self.scraper.startDriving()

            while True:
                work = self.task_q.get()

                if work == 'KILL':
                    self.scraper.driver.quit()
                    break

                self.handleWork( work )
        except:
            print traceback.format_exc()

            raise

if __name__ == "__main__":
    num_workers = 4

    manager = multiprocessing.Manager()
    task_q = manager.Queue()
    result_q = manager.Queue()

    workers = []
    for worker_num in xrange(num_workers):
        worker = scrapeWorker(worker_num, task_q, result_q)
        worker.start()
        workers.append( worker )

    # you decide what job_stuff is
    # work == [ 'method_name', {'kw_1': val_1, ...} ]
    for work in job_stuff:
        task_q.put( work )

    results = []
    while len(results) < len(job_stuff):
        results.append( result_q.get() )

    for worker in workers:
        task_q.put( "KILL" )

    for worker in workers:
        worker.join()

    print "finished!"


####