Can anybody explain what the differents between these 3 variants?
http://jsfiddle.net/8vx2g3fr/2/
- First works as excpect, all events are processed.
- But second loses last event (3)
- Third loses second event (2)
Could you please help me to understand what the issue is and how to make the third variant process all events?
1
let bs = new Rx.Subject();
bs
.subscribe(v=>{
console.log("in", v);
if (v % 2 == 0) {
setTimeout(()=>{
console.log(" out", v, "->" , v + 1);
bs.next(v+1);
}, 0);
}
});
bs.next(0);
bs.next(2);
Output:
in 0
in 2
out 0 -> 1
in 1
out 2 -> 3
in 3
2
let bs2 = new Rx.Subject();
bs2
.subscribe(v=>{
console.log("in", v);
if (v % 2 == 0) {
Rx.Observable.interval(0).take(1)
.map(()=>{console.log(" out", v, "->" , v + 1);return v+1;})
.subscribe(bs2);
}
});
bs2.next(0);
bs2.next(2);
Output:
in 0
in 2
out 0 -> 1
in 1
out 2 -> 3
3
let bs3 = new Rx.Subject();
bs3
.switchMap(v=>{
console.log("in", v);
if (v % 2 == 0) {
return Rx.Observable.interval(0).take(1)
.map(()=>{console.log(" out", v, "->" , v + 1);return v+1;});
}
return Rx.Observable.empty();
}).subscribe(bs3);
bs3.next(0);
bs3.next(2);
Output:
in 0
in 2
out 2 -> 3
in 3
This all is in fact expected behavior.
The confusing thing is what happens when you reuse
Subject
and an operator such astake()
multiple times.Operator
take(1)
takes just a single value and sendcomplete
notification. This notification is received by theSubject
because of.subscribe(bs2)
. Now comes the most important part.When a
Subject
receives acomplete
orerror
notification it marks itself as stopped. This means it will never remit any items or notifications which is correct and expected behavior in Rx. Notificationscomplete
orerror
have to be the last emissions.So the
Subject
is completed by the firsttake(1)
which is triggered by value0
(thebs2.next(0)
call).Then when value
2
triggers the second run of theObservable.interval(0).take(1)
it's received by theSubject
but it's automatically ignored because theSubject
is already marked as stopped.The process in you third demo is exactly the same.
You can see it in the source code in
Subject.ts
:https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L86
https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56