How to unsubscribe or dispose an interval observab

2019-07-28 23:30发布

I'm new to the whole Rx thing and reactive programming, however I have to deal with such a situation. I want a interval observable to check a hardware's status by a POST request to its REST API every 500ms to see if the response changes. So once it changes, I want such interval observable POST request shut down immediately, leaving resource to other future operations. Here is a piece of code.

myLoop(item_array:number[], otheroption : string){
    for (let item of item_array){
        //set hardware option, still a request
        this.configHardware(item,otheroption)
        //after config the hardware, command hardware take action
            .flatMap(() => {
                //this return a session id for check status
                this.takeHardwareAction()
                    .flatMap( (result_id) => {
                        //I want every 500ms check if hardware action is done or not
                        let actionCheckSubscription = IntervalObservable.create(500).subscribe(
                            () => {
                                //So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example, 
                                if (this.intervalCheckingStatus(result_id))
                                    actionCheckSubscription.unsubscribe() ;
                            }
                        ) ;
                    })

            })
    }
}

2条回答
Deceive 欺骗
2楼-- · 2019-07-28 23:56

You could use Observable.from and concatMap to iterate through all the items and then use a filter in combination with take(1) to stop an interval as soon as the validation passes the filter:

myLoop(item_array:number[], otheroption : string) {
    return Observable.from(item_array)
        .concatMap(item => this.configHardware(item, otheroption)
            .switchMap(resultId => Observable.interval(500)
                .switchMapTo(this.intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4], "fooBar")
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

Here is a live-example with mock-data

const Observable = Rx.Observable;

function myLoop(item_array) {
    return Observable.from(item_array)
        // if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
        .concatMap(item => configHardwareMock(item)
            .switchMap(resultId => Observable.interval(500)
                .do(() => console.info("Checking status of: " + resultId))
                .switchMap(() => intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4])
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

// mock helpers
function configHardwareMock(id) {
  return Observable.of(id);  
}

function intervalCheckingStatus(resultId) {
  if (Math.random() < .4) {
    return Observable.of(false);
  }
  
  return Observable.of(true);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

查看更多
姐就是有狂的资本
3楼-- · 2019-07-29 00:09

So you want to make a POST request every 500 ms and then check its response. I assume your method intervalCheckingStatus evaluates the POST response and determines whether it's different?

First, I wouldn't use IntervalObservable. Have you imported the RxJS module? It's the third party library endorsed by Angular and the one they use in all their developer guide samples. If not, install it and import it. https://github.com/Reactive-Extensions/RxJS

import * as Rx from 'rxjs/Rx';

I assume you've already imported Http, ResponseOptions etc but here it is in case others are curious:

import { Http, Response, ResponseOptions } from '@angular/http';

EDIT 1: Forgot to include the dependency injection. Inject Http into your constructor. I've called it http, hence how I'm calling this.http.post

constructor(private http: Http) {

Then, I would do the following:

EDIT 2: This would be inside your loop where the post arguments are relevant to the item in your array.

    // Every 500 ms, make a POST request
    Rx.Observable.interval(500)
                  // Add your POST arguments here
                 .map(_ => this.http.post(yourUrl, yourBody))
                 // This filter here is so that it will only emit when intervalCheckingStatus returns true
                 // You need to get the property you need from the Response: resp
                 // Is it the status code you're interested in? That's what I put as an example here but whatever it is, pass it to your method
                .filter(resp => this.intervalCheckingStatus(resp.status))
                // Take(1) takes only the first emitted value and once it does that, the observable completes. So you do NOT need to unsubscribe explicitly.
                .take(1);

If you need to do something once the response has the status (or whatever property) you're looking for, then chain a .subscribe to the end and perform the action you need in there. Again, due to take(1), as soon as the first element is pumped, the observable stream completes and you do not need to unsubscribe.

Also, here is a very helpful website: http://rxmarbles.com/#take You can see that in their example, the resulting observable is complete (vertical line) after 2 elements are taken.

查看更多
登录 后发表回答