我花了一整天的时间寻找在Python尽可能简单的多线程URL抓取工具,但是我发现大多数脚本使用队列或多重或复杂的库。
最后,我写了一个自己,我感到作为一个答案报告。 请随时提出任何改善。
我想其他人可能一直在寻找类似的东西。
我花了一整天的时间寻找在Python尽可能简单的多线程URL抓取工具,但是我发现大多数脚本使用队列或多重或复杂的库。
最后,我写了一个自己,我感到作为一个答案报告。 请随时提出任何改善。
我想其他人可能一直在寻找类似的东西。
尽量简化你的原始版本:
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
urlHandler = urllib2.urlopen(url)
html = urlHandler.read()
print "'%s\' fetched in %ss" % (url, (time.time() - start))
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
这里唯一的新招数:
join
已经告诉你。 Thread
子类,只是一个target
函数。 multiprocessing
有一个线程池,不启动其他进程:
#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
try:
response = urlopen(url)
return url, response.read(), None
except Exception as e:
return url, None, e
start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
if error is None:
print("%r fetched in %ss" % (url, timer() - start))
else:
print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))
优点相比, Thread
为基础的解决方案:
ThreadPool
允许限制(并发连接的最大数目20
中的代码示例) from urllib.request import urlopen
关于Python 3)。 在这个主要的例子concurrent.futures
你想要做的一切,有很多更简单。 此外,它可以通过一次只能做5处理的URL的数量巨大,而且它更漂亮地处理错误。
当然,这个模块仅建立在与Python 3.2或更高版本...但如果你使用2.5-3.1,你可以只安装反向移植, futures
,掉的PyPI。 所有你需要从示例代码改变是搜索和替换concurrent.futures
与futures
,而且,对于2.x中, urllib.request
用urllib2
。
这里的回迁到2.X的样本,修改为使用您的网址列表,并添加时间:
import concurrent.futures
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
# Retrieve a single page and report the url and contents
def load_url(url, timeout):
conn = urllib2.urlopen(url, timeout=timeout)
return conn.readall()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print '%r generated an exception: %s' % (url, exc)
else:
print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)
但是你可以让这个更简单。 真的,你需要的是:
def load_url(url):
conn = urllib2.urlopen(url, timeout)
data = conn.readall()
print '"%s" fetched in %ss' % (url,(time.time() - start))
return data
with futures.ThreadPoolExecutor(max_workers=5) as executor:
pages = executor.map(load_url, urls)
print "Elapsed Time: %ss" % (time.time() - start)
我现在发布不同的解决方案,由具有工作线程未守护进程,并将它们加入到主线程 (这意味着阻挡主线程,直到所有的工作线程已完成),而不是通知每个工作线程的执行结束时用回调到一个全局函数(如我在前面的答案一样),如一些评论也指出,这种方式不是线程安全的。
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
print "'%s\' fetched in %ss" % (self.url,(time.time() - start))
for url in urls:
FetchUrl(url).start()
#Join all existing threads to main thread.
for thread in threading.enumerate():
if thread is not threading.currentThread():
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
此脚本获取从一组在一个数组中定义URL的内容。 它生成每个URL被取一个线程,所以它的目的是用于一组有限的URL。
而不是使用一个队列对象,每个线程通知其一端与一个回调到一个全球性的功能,这使运行的线程数的计数。
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.setDaemon = True
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
finished_fetch_url(self.url)
def finished_fetch_url(url):
"callback function called when a FetchUrl thread ends"
print "\"%s\" fetched in %ss" % (url,(time.time() - start))
global left_to_fetch
left_to_fetch-=1
if left_to_fetch==0:
"all urls have been fetched"
print "Elapsed Time: %ss" % (time.time() - start)
for url in urls:
"spawning a FetchUrl thread for each url to fetch"
FetchUrl(url).start()