How do I rate limit requests losslessly using RxJS

2019-01-26 19:34发布

问题:

I would like to use make a series of requests to a server, but the server has a hard rate limit of 10 request per second. If I try to make the requests in a loop, it will hit the rate limit since all the requests will happen at the same time.

for(let i = 0; i < 20; i++) {
  sendRequest();
}

ReactiveX has lots of tools for modifying observable streams, but I can't seem to find the tools to implement rate limiting. I tried adding a standard delay, but the requests still fire at the same time, just 100ms later than they did previously.

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);

The debounceTime and throttleTime operators are similar to what I'm looking for as well, but that is lossy instead of lossless. I want to preserve every request that I make, instead of discarding the earlier ones.

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...

How do I make these requests to the server without exceeding the rate limit using ReactiveX and Observables?

回答1:

The implementation in the OP's self answer (and in the linked blog) always imposes a delay which is less than ideal.

If the rate-limited service allows for 10 requests per second, it should be possible to make 10 requests in, say, 10 milliseconds, as long as the next request is not made for another 990 milliseconds.

The implementation below applies a variable delay to ensure the limit is enforced and the delay is only applied to requests that would see the limit exceeded.

function rateLimit(source, count, period) {

  return source
    .scan((records, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all values received within the last period.

      records = records.filter((record) => record.until > since);
      if (records.length >= count) {

        // until is the time until which the value should be delayed.

        const firstRecord = records[0];
        const lastRecord = records[records.length - 1];
        const until = firstRecord.until + (period * Math.floor(records.length / count));

        // concatMap is used below to guarantee the values are emitted
        // in the same order in which they are received, so the delays
        // are cumulative. That means the actual delay is the difference
        // between the until times.

        records.push({
          delay: (lastRecord.until < now) ?
            (until - now) :
            (until - lastRecord.until),
          until,
          value
        });
      } else {
        records.push({
          delay: 0,
          until: now,
          value
        });
      }
      return records;

    }, [])
    .concatMap((records) => {

      const lastRecord = records[records.length - 1];
      const observable = Rx.Observable.of(lastRecord.value);
      return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 30),
  10,
  1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>



回答2:

I wrote a library to do this, you set up the maximum number of requests per interval and it rate limits observables by delaying subscriptions. It's tested and with examples: https://github.com/ohjames/rxjs-ratelimiter



回答3:

This blog post does a great job of explaining that RxJS is great at discarding events, and how they came to the answer, but ultimately, the code you're looking for is:

queueRequest$
  .concatMap(queueData => Rx.Observable.of(queueData).delay(100))
  .subscribe(() => {
    sendRequest();
  });

concatMap adds concatenates the newly created observable to the back of the observable stream. Additionally, using delay pushes back the event by 100ms, allowing 10 request to happen per second. You can view the full JSBin here, which logs to the console instead of firing requests.



回答4:

Actually, there's an easier way to do this with the bufferTime() operator and its three arguments:

bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize)

This means we can use bufferTime(1000, null, 10) which means we'll emit a buffer of max 10 items or after max 1s. The null means we want to open a new buffer immediately after the current buffer is emitted.

function mockRequest(val) {
  return Observable
    .of(val)
    .delay(100)
    .map(val => 'R' + val);
}

Observable
  .range(0, 55)
  .concatMap(val => Observable.of(val)
    .delay(25) // async source of values
    // .delay(175)
  )

  .bufferTime(1000, null, 10) // collect all items for 1s

  .concatMap(buffer => Observable
    .from(buffer) // make requests
    .delay(1000)  // delay this batch by 1s (rate-limit)
    .mergeMap(value => mockRequest(value)) // collect results regardless their initial order
    .toArray()
  )
  // .timestamp()
  .subscribe(val => console.log(val));

See live demo: https://jsbin.com/mijepam/19/edit?js,console

You can experiment with different initial delay. With only 25ms the request will be sent in batches by 10:

[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ]
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ]
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ]
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ]
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]

But with .delay(175) we'll emit batches of less than 10 items because we're limited by the 1s delay.

[ 'R0', 'R1', 'R2', 'R3', 'R4' ]
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ]
[ 'R11', 'R12', 'R13', 'R14', 'R15' ]
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ]
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ]
[ 'R28', 'R29', 'R30', 'R31', 'R32' ]
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ]
[ 'R39', 'R40', 'R41', 'R42', 'R43' ]
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]

There's however one difference to what you might need. This solution starts initially starts emitting values after 2s delay because of the .bufferTime(1000, ...) and delay(1000). All other emissions happen after 1s.

You could eventually use:

.bufferTime(1000, null, 10)
.mergeAll()
.bufferCount(10)

This will always collect 10 items and only after that it'll perform the request. This would be probably more efficient.