Add queueing to Angular's HttpClient

2020-02-24 11:49发布

问题:

I have exact same requirement as mentioned in Add queueing to angulars $http service but need implementation in Angular 4.3 or 5 using the HttpInterceptor from @angular/common/http.

I have a very quirky API that can only handle a single request at a time for a particular browser session. Therefore, I need to ensure that every time a request is made in same session, it goes into a queue, and that queue is executed one request at a time, until it is empty.

回答1:


Solution


@Zlatko has suggested correct approach, although there are few logical and syntax issues in it but I have corrected it pasting below a working code:

import { Injectable } from '@angular/core';
import { Response } from '@angular/http';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject'

export class PendingRequest {
  url: string;
  method: string;
  options: any;
  subscription: Subject<any>;

  constructor(url: string, method: string, options: any, subscription: Subject<any>) {
    this.url = url;
    this.method = method;
    this.options = options;
    this.subscription = subscription;
  }
}

@Injectable()
export class BackendService {
  private requests$ = new Subject<any>();
  private queue: PendingRequest[] = [];

  constructor(private httpClient: HttpClient) {
    this.requests$.subscribe(request => this.execute(request));
  }

  /** Call this method to add your http request to queue */
  invoke(url, method, params, options) {
    return this.addRequestToQueue(url, method, params, options);
  }

  private execute(requestData) {
    //One can enhance below method to fire post/put as well. (somehow .finally is not working for me)
    const req = this.httpClient.get(requestData.url)
      .subscribe(res => {
        const sub = requestData.subscription;
        sub.next(res);
        this.queue.shift();
        this.startNextRequest();
      });
  }

  private addRequestToQueue(url, method, params, options) {
    const sub = new Subject<any>();
    const request = new PendingRequest(url, method, options, sub);

    this.queue.push(request);
    if (this.queue.length === 1) {
      this.startNextRequest();
    }
    return sub;
  }

  private startNextRequest() {
    // get next request, if any.
    if (this.queue.length > 0) {
      this.execute(this.queue[0]);
    }
  }
}

One can use/call above service following way to make HTTP calls (from any other component or service) (Obviously you need to inject BackendService in the component e.g. Mention in provider of component and define in constructor):

    this.backendService.invoke('https://jsonplaceholder.typicode.com/posts', 'Get', null, null)
    .subscribe(
      result => {
        this.data = result;
      }
    );

In case of someone wants to look at working plunker then here is the working plunker.



回答2:

You can do this relatively easily. A naive example follows bellow.

It lacks typing etc, it's not elaborate, it has a few weak points, and it would be better to extract the queueing part and http-requesting part into different services or classes, but this should get you started.

interface PendingRequest {
  url: string;
  method: string;
  options: any;
  subscription: Observable<any>;
}

@Injectable()
export class BackendService {
  // This is your private requests queue emitter.
  private requests$: Subject = new Subject();
  private queue: PendingRequest[] = [];

  constructor(private http: HttpClient) {
    // subscribe to that queue up there
    this.requests$.subscribe(request => this.execute(request));
  }

  // This is your public API - you can extend it to get/post/put or specific
  // endpoints like 'getUserProfile()' etc.
  invoke(url, method, options) {
      return this.addRequestToQueue(url, method, params, options);
  }

  private execute(requestData) {
    const req = this.httpClient.request(requestData.method, requestData.url, requestData.options)
      // as a last step, invoke next request if any
      .finally(() => this.startNextRequest());

    const sub = requestData.subscription;
    sub.switchMap(req);

  }

  private addRequestToQueue(url, method, options) {
    const sub = new Subject<any>();
    const request = new PendingRequest(url, method, options, sub)
    // if there are no pending req's, execute immediately.
    if (this.queue.length === 0) {
      this.requests$.next(request);
    } else {
      // otherwise put it to queue.
      this.queue.push(request);
    }
    return sub;
  }

  private startNextRequest() {
    // get next request, if any.
    if (this.queue.length) {
      this.execute(this.queue.shift());
    }
  }
}


回答3:

I have the exact same requirement as you. The other answers work perfectly fine, just that it requires developer to create requests with another a custom service instead of native HttpClient. You could try the following interceptor to apply queuing as well.

This solution requires you to add 2 services, a HttpInterceptor and a service (RequestQueueService) to manage the queue.

HttpInterceptor:

@Injectable()
export class QueueInterceptorService implements HttpInterceptor {
  constructor(private queueService: RequestQueueService) { }

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    return this.queueService.intercept(request, next);
  }
}

RequestQueueService:

@Injectable({
  providedIn: 'root'
})
export class RequestQueueService {
  private queue: ReplaySubject<any>[] = [];

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    const requestQueueItem$ = new ReplaySubject<any>();
    const result$ = requestQueueItem$.pipe(
      switchMap(() => next.handle(request).pipe(
        tap(req => {
          if (req.type == HttpEventType.Response) {
            this.processNextRequest();
          }
        }),
        catchError(err => {
          this.processNextRequest();
          throw err;
        })
      ))
    );
    this.queue.push(requestQueueItem$);

    if (this.queue.length <= 1) {
      this.dispatchRequest();
    }

    return result$;
  }

  private processNextRequest(): void {
    if (this.queue && this.queue.length > 0) {
      this.queue.shift();
    }
    this.dispatchRequest();
  }

  private dispatchRequest(): void {
    if (this.queue.length > 0) {
      const nextSub$ = this.queue[0];
      nextSub$.next();
      nextSub$.complete();
    }
  }
}

Lastly, in AppModule:

@NgModule({
  declarations: [
    AppComponent
  ],
  imports: [
    BrowserModule,
    HttpClientModule
  ],
  providers: [
    RequestQueueService,
    { provide: HTTP_INTERCEPTORS, useClass: QueueInterceptorService, multi: true }
  ],
  bootstrap: [AppComponent]
})
export class AppModule { }

BTW, I am using Angular 8 with rxjs 6.4.