Limiting/throttling the rate of HTTP requests in G

2019-03-08 11:04发布

问题:

I'm writing a small script in Python 2.7.3 with GRequests and lxml that will allow me to gather some collectible card prices from various websites and compare them. Problem is one of the websites limits the number of requests and sends back HTTP error 429 if I exceed it.

Is there a way to add throttling the number of requests in GRequestes so that I don't exceed the number of requests per second I specify? Also - how can I make GRequestes retry after some time if HTTP 429 occurs?

On a side note - their limit is ridiculously low. Something like 8 requests per 15 seconds. I breached it with my browser on multiple occasions just refreshing the page waiting for price changes.

回答1:

Going to answer my own question since I had to figure this by myself and there seems to be very little info on this going around.

The idea is as follows. Every request object used with GRequests can take a session object as a parameter when created. Session objects on the other hand can have HTTP adapters mounted that are used when making requests. By creating our own adapter we can intercept requests and rate-limit them in way we find best for our application. In my case I ended up with the code below.

Object used for throttling:

DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)


class BurstThrottle(object):
    max_hits = None
    hits = None
    burst_window = None
    total_window = None
    timestamp = None

    def __init__(self, max_hits, burst_window, wait_window):
        self.max_hits = max_hits
        self.hits = 0
        self.burst_window = burst_window
        self.total_window = burst_window + wait_window
        self.timestamp = datetime.datetime.min

    def throttle(self):
        now = datetime.datetime.utcnow()
        if now < self.timestamp + self.total_window:
            if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
                self.hits += 1
                return datetime.timedelta(0)
            else:
                return self.timestamp + self.total_window - now
        else:
            self.timestamp = now
            self.hits = 1
            return datetime.timedelta(0)

HTTP adapter:

class MyHttpAdapter(requests.adapters.HTTPAdapter):
    throttle = None

    def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
                 pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
                 pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
                 wait_window=DEFAULT_WAIT_WINDOW):
        self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
        super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
                                            max_retries=max_retries, pool_block=pool_block)

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        request_successful = False
        response = None
        while not request_successful:
            wait_time = self.throttle.throttle()
            while wait_time > datetime.timedelta(0):
                gevent.sleep(wait_time.total_seconds(), ref=True)
                wait_time = self.throttle.throttle()

            response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
                                                       verify=verify, cert=cert, proxies=proxies)

            if response.status_code != 429:
                request_successful = True

        return response

Setup:

requests_adapter = adapter.MyHttpAdapter(
    pool_connections=__CONCURRENT_LIMIT__,
    pool_maxsize=__CONCURRENT_LIMIT__,
    max_retries=0,
    pool_block=False,
    burst_window=datetime.timedelta(seconds=5),
    wait_window=datetime.timedelta(seconds=20))

requests_session = requests.session()
requests_session.mount('http://', requests_adapter)
requests_session.mount('https://', requests_adapter)

unsent_requests = (grequests.get(url,
                                 hooks={'response': handle_response},
                                 session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)


回答2:

Take a look at this for automatic requests throttling: https://pypi.python.org/pypi/RequestsThrottler/0.2.2

You can set both a fixed amount of delay between each request or set a number of requests to send in a fixed amount of seconds (which is basically the same thing):

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', delay=1.5) as bt:
    throttled_requests = bt.multi_submit(reqs)

where the function multi_submit returns a list of ThrottledRequest (see doc: link at the end).

You can then access to the responses:

for tr in throttled_requests:
    print tr.response

Alternatively you can achieve the same by specifying the number or requests to send in a fixed amount of time (e.g. 15 requests every 60 seconds):

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', reqs_over_time=(15, 60)) as bt:
    throttled_requests = bt.multi_submit(reqs)

Both solutions can be implemented without the usage of the with statement:

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
bt = BaseThrottler(name='base-throttler', delay=1.5)
bt.start()
throttled_requests = bt.multi_submit(reqs)
bt.shutdown()

For more details: http://pythonhosted.org/RequestsThrottler/index.html



回答3:

Doesn't look like there's any simple mechanism for handling this build in to the requests or grequests code. The only hook that seems to be around is for responses.

Here's a super hacky work-around to at least prove it's possible - I modified grequests to keep a list of the time when a request was issued and sleep the creation of the AsyncRequest until the requests per second were below the maximum.

class AsyncRequest(object):
    def __init__(self, method, url, **kwargs):
        print self,'init'
        waiting=True
        while waiting:
            if len([x for x in q if x > time.time()-15]) < 8:
                q.append(time.time())
                waiting=False
            else:
                print self,'snoozing'
                gevent.sleep(1)

You can use grequests.imap() to watch this interactively

import time
import rg

urls = [
        'http://www.heroku.com',
        'http://python-tablib.org',
        'http://httpbin.org',
        'http://python-requests.org',
        'http://kennethreitz.com',
        'http://www.cnn.com',
]

def print_url(r, *args, **kwargs):
        print(r.url),time.time()

hook_dict=dict(response=print_url)
rs = (rg.get(u, hooks=hook_dict) for u in urls)
for r in rg.imap(rs):
        print r

I wish there was a more elegant solution, but so far I can't find one. Looked around in sessions and adapters. Maybe the poolmanager could be augmented instead?

Also, I wouldn't put this code in production - the 'q' list never gets trimmed and would eventually get pretty big. Plus, I don't know if it's actually working as advertised. It just looks like it is when I look at the console output.

Ugh. Just looking at this code I can tell it's 3am. Time to goto bed.



回答4:

I had a similar problem. Here's my solution. In your case, I would do:

def worker():
    with rate_limit('slow.domain.com', 2):
        response = requests.get('https://slow.domain.com/path')
        text = response.text
    # Use `text`

Assuming you have multiple domains you're culling from, I would setup a dictionary mapping (domain, delay) so you don't hit your rate limits.

This code assumes you're going to use gevent and monkey patch.

from contextlib import contextmanager
from gevent.event import Event
from gevent.queue import Queue
from time import time


def rate_limit(resource, delay, _queues={}):
    """Delay use of `resource` until after `delay` seconds have passed.

    Example usage:

    def worker():
        with rate_limit('foo.bar.com', 1):
            response = requests.get('https://foo.bar.com/path')
            text = response.text
        # use `text`

    This will serialize and delay requests from multiple workers for resource
    'foo.bar.com' by 1 second.

    """

    if resource not in _queues:
        queue = Queue()
        gevent.spawn(_watch, queue)
        _queues[resource] = queue

    return _resource_manager(_queues[resource], delay)


def _watch(queue):
    "Watch `queue` and wake event listeners after delay."

    last = 0

    while True:
        event, delay = queue.get()

        now = time()

        if (now - last) < delay:
            gevent.sleep(delay - (now - last))

        event.set()   # Wake worker but keep control.
        event.clear()
        event.wait()  # Yield control until woken.

        last = time()


@contextmanager
def _resource_manager(queue, delay):
    "`with` statement support for `rate_limit`."

    event = Event()
    queue.put((event, delay))

    event.wait() # Wait for queue watcher to wake us.

    yield

    event.set()  # Wake queue watcher.