example urllib3 and threading in python

2019-04-02 02:56发布

问题:

I am trying to use urllib3 in simple thread to fetch several wiki pages. The script will

Create 1 connection for every thread (I don't understand why) and Hang forever. Any tip, advice or simple example of urllib3 and threading

import threadpool
from urllib3 import connection_from_url

HTTP_POOL = connection_from_url(url, timeout=10.0, maxsize=10, block=True)

def fetch(url, fiedls):
  kwargs={'retries':6}
  return HTTP_POOL.get_url(url, fields, **kwargs)

pool = threadpool.ThreadPool(5)
requests = threadpool.makeRequests(fetch, iterable)
[pool.putRequest(req) for req in requests]

@Lennart's script got this error:

http://en.wikipedia.org/wiki/2010-11_Premier_LeagueTraceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 156, in run
 http://en.wikipedia.org/wiki/List_of_MythBusters_episodeshttp://en.wikipedia.org/wiki/List_of_Top_Gear_episodes http://en.wikipedia.org/wiki/List_of_Unicode_characters    result = request.callable(*request.args, **request.kwds)
  File "crawler.py", line 9, in fetch
    print url, conn.get_url(url)
AttributeError: 'HTTPConnectionPool' object has no attribute 'get_url'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 156, in run
    result = request.callable(*request.args, **request.kwds)
  File "crawler.py", line 9, in fetch
    print url, conn.get_url(url)
AttributeError: 'HTTPConnectionPool' object has no attribute 'get_url'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 156, in run
    result = request.callable(*request.args, **request.kwds)
  File "crawler.py", line 9, in fetch
    print url, conn.get_url(url)
AttributeError: 'HTTPConnectionPool' object has no attribute 'get_url'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 156, in run
    result = request.callable(*request.args, **request.kwds)
  File "crawler.py", line 9, in fetch
    print url, conn.get_url(url)
AttributeError: 'HTTPConnectionPool' object has no attribute 'get_url'

After adding import threadpool; import urllib3 and tpool = threadpool.ThreadPool(4) @user318904's code got this error:

Traceback (most recent call last):
  File "crawler.py", line 21, in <module>
    tpool.map_async(fetch, urls)
AttributeError: ThreadPool instance has no attribute 'map_async'

回答1:

Obviously it will create one connection per thread, how should else each thread be able to fetch a page? And you try to use the same connection, made from one url, for all urls. That can hardly be what you intended.

This code worked just fine:

import threadpool
from urllib3 import connection_from_url

def fetch(url):
  kwargs={'retries':6}
  conn = connection_from_url(url, timeout=10.0, maxsize=10, block=True)
  print url, conn.get_url(url)
  print "Done!"

pool = threadpool.ThreadPool(4)
urls = ['http://en.wikipedia.org/wiki/2010-11_Premier_League',
        'http://en.wikipedia.org/wiki/List_of_MythBusters_episodes',
        'http://en.wikipedia.org/wiki/List_of_Top_Gear_episodes',
        'http://en.wikipedia.org/wiki/List_of_Unicode_characters',
        ]
requests = threadpool.makeRequests(fetch, urls)

[pool.putRequest(req) for req in requests]
pool.wait()


回答2:

Thread programming is hard, so I wrote workerpool to make exactly what you're doing easier.

More specifically, see the Mass Downloader example.

To do the same thing with urllib3, it looks something like this:

import urllib3
import workerpool

pool = urllib3.connection_from_url("foo", maxsize=3)

def download(url):
    r = pool.get_url(url)
    # TODO: Do something with r.data
    print "Downloaded %s" % url

# Initialize a pool, 5 threads in this case
pool = workerpool.WorkerPool(size=5)

# The ``download`` method will be called with a line from the second 
# parameter for each job.
pool.map(download, open("urls.txt").readlines())

# Send shutdown jobs to all threads, and wait until all the jobs have been completed
pool.shutdown()
pool.wait()

For more sophisticated code, have a look at workerpool.EquippedWorker (and the tests here for example usage). You can make the pool be the toolbox you pass in.



回答3:

Here is my take, a more current solution using Python3 and concurrent.futures.ThreadPoolExecutor.

import urllib3
from concurrent.futures import ThreadPoolExecutor

urls = ['http://en.wikipedia.org/wiki/2010-11_Premier_League',
        'http://en.wikipedia.org/wiki/List_of_MythBusters_episodes',
        'http://en.wikipedia.org/wiki/List_of_Top_Gear_episodes',
        'http://en.wikipedia.org/wiki/List_of_Unicode_characters',
        ]

def download(url, cmanager):
    response = cmanager.request('GET', url)
    if response and response.status == 200:
        print("+++++++++ url: " + url)
        print(response.data[:1024])

connection_mgr = urllib3.PoolManager(maxsize=5)
thread_pool = ThreadPoolExecutor(5)
for url in urls:
    thread_pool.submit(download, url, connection_mgr)

Some remarks

  • My code is based on a similar example from the Python Cookbook by Beazley and Jones.
  • I particularly like the fact that you only need a standard module besides urllib3.
  • The setup is extremely simple, and if you are only going for side-effects in download (like printing, saving to a file, etc.), there is no additional effort in joining the threads.
  • If you want something different, ThreadPoolExecutor.submit actually returns whatever download would return, wrapped in a Future.
  • I found it helpful to align the number of threads in the thread pool with the number of HTTPConnection's in a connection pool (via maxsize). Otherwise you might encounter (harmless) warnings when all threads try to access the same server (as in the example).


回答4:

I use something like this:

#excluding setup for threadpool etc

upool = urllib3.HTTPConnectionPool('en.wikipedia.org', block=True)

urls = ['/wiki/2010-11_Premier_League',
        '/wiki/List_of_MythBusters_episodes',
        '/wiki/List_of_Top_Gear_episodes',
        '/wiki/List_of_Unicode_characters',
        ]

def fetch(path):
    # add error checking
    return pool.get_url(path).data

tpool = ThreadPool()

tpool.map_async(fetch, urls)

# either wait on the result object or give map_async a callback function for the results