Angular2: switchMap not canceling previous http ca

2020-05-19 02:28发布

问题:

I am trying to use switchMap to cancel any previous http calls in Angular2. The code is basically

var run = ():Observable<any> => {
        var url = 'http://...'
        return this._http.get(url)
            .map(result => {
                return var xmlData:string = result.text()

            });
    }


    function pollTasks() {
        return Observable.of(1)
            .switchMap(() => run())
            .map(res => res)
    }

    // caller can do subscription and store it as a handle:
    let tasksSubscription =
        pollTasks()
            .subscribe(data => {
                console.log('aa'+data)
            });

and so I call the entire source several times in a row and receive several replies (i.e.: aa+data)

I was under the impression switchMap should cancel the previous calls.

回答1:

The requests need to originate from the same underlying stream. Here's a factory function that will create an http service instance that should do what you want:

function httpService(url) {
   // httpRequest$ stream that allows us to push new requests
   const httpRequest$ = new Rx.Subject();

   // httpResponse$ stream that allows clients of the httpService
   // to handle our responses (fetch here can be replaced with whatever
   // http library you're using).
   const httpResponse$ = httpRequest$
       .switchMap(() => fetch(url));


   // Expose a single method get() that pushes a new
   // request onto the httpRequest stream. Expose the
   // httpResponse$ stream to handle responses.
   return {
       get: () => httpRequest$.next(),
       httpResponse$
   };
}

And now the client code can use this service like this:

const http = httpService('http://my.api.com/resource');

// Subscribe to any responses
http.httpResponse$.subscribe(resp => console.log(resp));

// Call http.get() a bunch of times: should only get one log!!
http.get();
http.get();
http.get();


回答2:

constructor(private _http:Http, private appStore:AppStore) {

        this.httpRequest$ = new Subject();


        this.httpRequest$
            .map(v=> {
                return v;
        })
        .switchMap((v:any):any => {
            console.log(v);
            if (v.id==-1||v.id=='-1')
                return 'bye, cancel all pending network calls';                
            return this._http.get('example.com)
                .map(result => {
                    var xmlData:string = result.text()
                });
        }).share()
        .subscribe(e => {                
        })
    ...

and to push data in:

this.httpRequest$.next({id: busId});        

this works great and I can now have a single service that I can pipe all network calls through as well as cancel prior calls...

see image below, as new calls come in, prior ones are canceled. note how I set slow network with a delay of 4sec to provide all is working as expected...



回答3:

I think when you use the switchMap operator, you can only cancel requests in the current data flow. I mean the events that occur on the same observable chain...

If you call your pollTasks method several times, you won't be able to the cancel previous requests because they won't be in the same data flow... You create an observable chain each time you call the method.

I don't know how you trigger the execution of your requests.

If you want to execute your request each 500ms, you could try this:

pollTasks() {
  return Observable.interval(500)
                .switchMap(() => run())
                .map(res => res.json());
}

In this case, if there are in-progress request after 500ms, they will be canceled to execute the new one

With the approach, you just need to call once the pollTasks method.

You can also trigger the asynchronous processing chain based on user events. For example, when characters are filled in inputs:

var control = new Control();
// The control is attached to an input using the ngFormControl directive
control.valueChanges.switchMap(() => run())
                .map(res => res.json())
                .subscribe(...);

There is a proposal to link / initiate more easily processing chain on DOM events (fromEvent)

See this link:

  • https://github.com/angular/angular/issues/4062


回答4:

You can use Subject but if you do so, you have to manage subscription and publish. If you just want a method returning Observable which cancel requests within a interval, here is how I would do that :

observer: Observer<CustomObject>;
debug = 0;

myFunction(someValue) {
    this.observable = new Observable<CustomObject>(observer => {
        if (!this.observer) {
            this.observer = observer;
        }

       // we simulate http response time
        setTimeout(() => {
            this.observer.next(someValue);
            this.debug++;
        }, 1000);
    })
    .debounceTime(3000) // We cancel request withing 3s interval and take only the last one
    .switchMap(fn)
    .do(() => {
        console.log("debug", this.debug);
    });
}

Using the same observer all along let us cancel requests according all we want (debounceTime, distinctUntilChanged, ...).