I'm new to the whole Rx thing and reactive programming, however I have to deal with such a situation. I want a interval observable to check a hardware's status by a POST request to its REST API every 500ms to see if the response changes. So once it changes, I want such interval observable POST request shut down immediately, leaving resource to other future operations. Here is a piece of code.
myLoop(item_array:number[], otheroption : string){
for (let item of item_array){
//set hardware option, still a request
this.configHardware(item,otheroption)
//after config the hardware, command hardware take action
.flatMap(() => {
//this return a session id for check status
this.takeHardwareAction()
.flatMap( (result_id) => {
//I want every 500ms check if hardware action is done or not
let actionCheckSubscription = IntervalObservable.create(500).subscribe(
() => {
//So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example,
if (this.intervalCheckingStatus(result_id))
actionCheckSubscription.unsubscribe() ;
}
) ;
})
})
}
}
You could use Observable.from
and concatMap
to iterate through all the items and then use a filter
in combination with take(1)
to stop an interval as soon as the validation passes the filter
:
myLoop(item_array:number[], otheroption : string) {
return Observable.from(item_array)
.concatMap(item => this.configHardware(item, otheroption)
.switchMap(resultId => Observable.interval(500)
.switchMapTo(this.intervalCheckingStatus(resultId))
.filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
.take(1) // and complete after 1 value was valid
.mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
)
);
}
// usage:
myLoop([1,2,3,4], "fooBar")
.subscribe(
item => console.log(`Item ${item} is now valid`),
error => console.error("Some error occured", error),
() => console.log("All items are valid now")
);
Here is a live-example with mock-data
const Observable = Rx.Observable;
function myLoop(item_array) {
return Observable.from(item_array)
// if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
.concatMap(item => configHardwareMock(item)
.switchMap(resultId => Observable.interval(500)
.do(() => console.info("Checking status of: " + resultId))
.switchMap(() => intervalCheckingStatus(resultId))
.filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
.take(1) // and complete after 1 value was valid
.mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
)
);
}
// usage:
myLoop([1,2,3,4])
.subscribe(
item => console.log(`Item ${item} is now valid`),
error => console.error("Some error occured", error),
() => console.log("All items are valid now")
);
// mock helpers
function configHardwareMock(id) {
return Observable.of(id);
}
function intervalCheckingStatus(resultId) {
if (Math.random() < .4) {
return Observable.of(false);
}
return Observable.of(true);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
So you want to make a POST request every 500 ms and then check its response. I assume your method intervalCheckingStatus evaluates the POST response and determines whether it's different?
First, I wouldn't use IntervalObservable.
Have you imported the RxJS module? It's the third party library endorsed by Angular and the one they use in all their developer guide samples. If not, install it and import it.
https://github.com/Reactive-Extensions/RxJS
import * as Rx from 'rxjs/Rx';
I assume you've already imported Http, ResponseOptions etc but here it is in case others are curious:
import { Http, Response, ResponseOptions } from '@angular/http';
EDIT 1: Forgot to include the dependency injection. Inject Http into your constructor. I've called it http, hence how I'm calling this.http.post
constructor(private http: Http) {
Then, I would do the following:
EDIT 2: This would be inside your loop where the post arguments are relevant to the item in your array.
// Every 500 ms, make a POST request
Rx.Observable.interval(500)
// Add your POST arguments here
.map(_ => this.http.post(yourUrl, yourBody))
// This filter here is so that it will only emit when intervalCheckingStatus returns true
// You need to get the property you need from the Response: resp
// Is it the status code you're interested in? That's what I put as an example here but whatever it is, pass it to your method
.filter(resp => this.intervalCheckingStatus(resp.status))
// Take(1) takes only the first emitted value and once it does that, the observable completes. So you do NOT need to unsubscribe explicitly.
.take(1);
If you need to do something once the response has the status (or whatever property) you're looking for, then chain a .subscribe to the end and perform the action you need in there. Again, due to take(1), as soon as the first element is pumped, the observable stream completes and you do not need to unsubscribe.
Also, here is a very helpful website:
http://rxmarbles.com/#take
You can see that in their example, the resulting observable is complete (vertical line) after 2 elements are taken.