gevent / requests hangs while making lots of head

2020-05-29 06:01发布

I need to make 100k head requests, and I'm using gevent on top of requests. My code runs for a while, but then eventually hangs. I'm not sure why it's hanging, or whether it's hanging inside requests or gevent. I'm using the timeout argument inside both requests and gevent.

Please take a look at my code snippet below, and let me know what I should change.

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

I've tried grequests, but it's been abandoned, and I've gone through the github pull requests, but they all have issues too.

2条回答
Ridiculous、
2楼-- · 2020-05-29 06:45

I'm not sure if this will resolve your issue, but you are not using pool.Pool() correctly.

Try this:

def expand_short_urls(short_urls, chunk_size=100):
    # Pool() automatically limits your process to chunk_size greenlets running concurrently
    # thus you don't need to do all that chunking business you were doing in your for loop
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
    # NOT the value your function, get_head, will return
    threads = [p.spawn(get_head, short_url) for short_url in short_urls]
    p.join()

    # to access the returned value of your function, access the Greenlet.value property
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

if thread.value is not None and thread.value.status_code == 200} return results

查看更多
一夜七次
3楼-- · 2020-05-29 06:46

The RAM usage you are observing mainly stems from all the data that piles up while storing 100.000 response objects, and all the underlying overhead. I have reproduced your application case, and fired off HEAD requests against 15000 URLS from the top Alexa ranking. It did not really matter

  • whether I used a gevent Pool (i.e. one greenlet per connection) or a fixed set of greenlets, all requesting multiple URLs
  • how large I set the pool size

In the end, the RAM usage grew over time, to considerable amounts. However, I noticed that changing from requests to urllib2 already lead to a reduction in RAM usage, by about factor two. That is, I replaced

result = requests.head(url)

with

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

Some other advice: do not use two timeout mechanisms. Gevent's timeout approach is very solid, and you can easily use it like this:

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

Might look tricky, but either returns None (after quite precisely 5 seconds, and indicates timeout), any exception object representing a communication error, or the response. Works great!

Although this likely is not part of the issue, in such cases I recommend to keep workers alive and let them work on multiple items each! The overhead of spawning greenlets is small, indeed. Still, this would be a very simple solution with a set of long-lived greenlets:

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

Also, you really need to appreciate that this is a large-scale problem you are tackling there, yielding non-standard issues. When I reproduced your scenario with a timeout of 20 s and 100 workers and 15000 URLs to be requested, I easily got a large number of sockets:

# netstat -tpn | wc -l
10074

That is, the OS had more than 10000 sockets to manage, most of them in TIME_WAIT state. I also observed "Too many open files" errors, and tuned the limits up, via sysctl. When you request 100.000 URLs you will probably hit such limits, too, and you need to come up with measures to prevent system starving.

Also note the way you are using requests, it automatically follows redirects from HTTP to HTTPS, and automatically verifies the certificate, all of which surely costs RAM.

In my measurements, when I divided the number of requested URLs by the runtime of the program, I almost never passed 100 responses/s, which is the result of the high-latency connections to foreign servers all over the world. I guess you also are affected by such a limit. Adjust the rest of the architecture to this limit, and you will probably be able to generate a data stream from the Internet to disk (or database) with not so large RAM usage inbetween.

I should address your two main questions, specifically:

I think gevent/the way you are using it is not your problem. I think you are just underestimating the complexity of your task. It comes along with nasty problems, and drives your system to its limits.

  • your RAM usage issue: Start off by using urllib2, if you can. Then, if things accumulate still too high, you need to work against accumulation. Try to produce a steady state: you might want to start writing off data to disk and generally work towards the situation where objects can become garbage collected.

  • your code "eventually hangs": probably this is as of your RAM issue. If it is not, then do not spawn so many greenlets, but reuse them as indicated. Also, further reduce concurrency, monitor the number of open sockets, increase system limits if necessary, and try to find out exactly where your software hangs.

查看更多
登录 后发表回答