I could use some pseudo-code, or better, Python. I am trying to implement a rate-limiting queue for a Python IRC bot, and it partially works, but if someone triggers less messages than the limit (e.g., rate limit is 5 messages per 8 seconds, and the person triggers only 4), and the next trigger is over the 8 seconds (e.g., 16 seconds later), the bot sends the message, but the queue becomes full and the bot waits 8 seconds, even though it\'s not needed since the 8 second period has lapsed.
问题:
回答1:
Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, \'allowance\' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can\'t send more than five messages per every eight seconds.
Note that rate
should be an integer, i.e. without non-zero decimal part, or the algorithm won\'t work correctly (actual rate will not be rate/per
). E.g. rate=0.5; per=1.0;
does not work because allowance
will never grow to 1.0. But rate=1.0; per=2.0;
works fine.
回答2:
Use this decorator @RateLimited(ratepersec) before your function that enqueues.
Basically, this checks if 1/rate secs have passed since the last time and if not, waits the remainder of the time, otherwise it doesn\'t wait. This effectively limits you to rate/sec. The decorator can be applied to any function you want rate-limited.
In your case, if you want a maximum of 5 messages per 8 seconds, use @RateLimited(0.625) before your sendToQueue function.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == \"__main__\":
print \"This should print 1,2,3... at about 2 per second.\"
for i in range(1,100):
PrintNumber(i)
回答3:
A Token Bucket is fairly simple to implement.
Start with a bucket with 5 tokens.
Every 5/8 seconds: If the bucket has less than 5 tokens, add one.
Each time you want to send a message: If the bucket has ≥1 token, take one token out and send the message. Otherwise, wait/drop the message/whatever.
(obviously, in actual code, you\'d use an integer counter instead of real tokens and you can optimize out the every 5/8s step by storing timestamps)
Reading the question again, if the rate limit is fully reset each 8 seconds, then here is a modification:
Start with a timestamp, last_send
, at a time long ago (e.g., at the epoch). Also, start with the same 5-token bucket.
Strike the every 5/8 seconds rule.
Each time you send a message: First, check if last_send
≥ 8 seconds ago. If so, fill the bucket (set it to 5 tokens). Second, if there are tokens in the bucket, send the message (otherwise, drop/wait/etc.). Third, set last_send
to now.
That should work for that scenario.
I\'ve actually written an IRC bot using a strategy like this (the first approach). Its in Perl, not Python, but here is some code to illustrate:
The first part here handles adding tokens to the bucket. You can see the optimization of adding tokens based on time (2nd to last line) and then the last line clamps bucket contents to the maximum (MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn is a data structure which is passed around. This is inside a method that runs routinely (it calculates when the next time it\'ll have something to do, and sleeps either that long or until it gets network traffic). The next part of the method handles sending. It is rather complicated, because messages have priorities associated with them.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
That\'s the first queue, which is run no matter what. Even if it gets our connection killed for flooding. Used for extremely important things, like responding to the server\'s PING. Next, the rest of the queues:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it\'ll have more work)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can\'t prevent it from doing its important kick/ban duties.
回答4:
to block processing until the message can be sent, thus queuing up further messages, antti\'s beautiful solution may also be modified like this:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
it just waits until enough allowance is there to send the message. to not start with two times the rate, allowance may also initialized with 0.
回答5:
Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
回答6:
One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.
This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.
回答7:
If someone still interested, I use this simple callable class in conjunction with a timed LRU key value storage to limit request rate per IP. Uses a deque, but can rewritten to be used with a list instead.
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, \"block\" if r() else \"pass\")
回答8:
How about this:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
回答9:
I needed a variation in Scala. Here it is:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
Here is how it can be used:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
回答10:
Just a python implementation of a code from accepted answer.
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler