A very simple multithreading parallel URL fetching

2019-01-06 09:45发布

问题:

I spent a whole day looking for the simplest possible multithreaded URL fetcher in Python, but most scripts I found are using queues or multiprocessing or complex libraries.

Finally I wrote one myself, which I am reporting as an answer. Please feel free to suggest any improvement.

I guess other people might have been looking for something similar.

回答1:

Simplifying your original version as far as possible:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

The only new tricks here are:

  • Keep track of the threads you create.
  • Don't bother with a counter of threads if you just want to know when they're all done; join already tells you that.
  • If you don't need any state or external API, you don't need a Thread subclass, just a target function.


回答2:

multiprocessing has a thread pool that doesn't start other processes:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

The advantages compared to Thread-based solution:

  • ThreadPool allows to limit the maximum number of concurrent connections (20 in the code example)
  • the output is not garbled because all output is in the main thread
  • errors are logged
  • the code works on both Python 2 and 3 without changes (assuming from urllib.request import urlopen on Python 3).


回答3:

The main example in the concurrent.futures does everything you want, a lot more simply. Plus, it can handle huge numbers of URLs by only doing 5 at a time, and it handles errors much more nicely.

Of course this module is only built in with Python 3.2 or later… but if you're using 2.5-3.1, you can just install the backport, futures, off PyPI. All you need to change from the example code is to search-and-replace concurrent.futures with futures, and, for 2.x, urllib.request with urllib2.

Here's the sample backported to 2.x, modified to use your URL list and to add the times:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

But you can make this even simpler. Really, all you need is:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data

with futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)


回答4:

I am now publishing a different solution, by having the worker threads not-deamon and joining them to the main thread (which means blocking the main thread until all worker threads have finished) instead of notifying the end of execution of each worker thread with a callback to a global function (as I did in the previous answer), as in some comments it was noted that such way is not thread-safe.

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)


回答5:

This script fetches the content from a set of URLs defined in an array. It spawns a thread for each URL to be fetch, so it is meant to be used for a limited set of URLs.

Instead of using a queue object, each thread is notifying its end with a callback to a global function, which keeps count of the number of threads running.

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()