I am learning angular 2 and rxjs. I have 3 variables, A B C.
- B depends on the value of A
- C depends on the value of A and B
I am trying to setup up the observables such that: When A is updated, B and C will be auto updated. When B is updated, C will be auto updated. I tried two setups but they are not satisfactory.
- First setup: B subscribes observable A; C subscribes observable B withlatestfrom A. The changes in A did cascade down to B then to C but the value from A is not the latest.
- Second setup: B subscribes observable A; C subscribes combineLatest observable of A and B. This setup works but I get two updates for C, first from B and then from A.
How can I set up my observables / subscription such that when A is updated, C will only get updated once with latest value from A and B?
EDIT - CODES ADDED
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
Code behaves this way because by default RxJs runs code synchronously. Chain of events for withLatestFrom
case:
- You push new
A
value: A.next(2);
- RxjS sees that there are two usages of
A
:
B
-subscription (comes first)
withLatestFrom
(comes second)
- RxJs calls
B
-subscription
- It pushes new value of
B
- RxjS sees that there is a usage of
B
: C
-subscription
- RxJs calls
C
-subscription (with old value of A
!)
- New value of
A
is pulled in withLatestFrom
but C
-subscription isn't triggered (because it is triggered only by B
updates)
Possible solution is to add debounceTime(0)
.It will force C
-subscription to run asynchronously when the value inside withLatestFrom
is already updated.
For more info google for RxJs's Scheduler
.
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
I want to add two things:
withLatestFrom
approach is better in your case because dependency graph looks like C -> B -> A
and when A is updated B is updated too. If A
and B
were independent then combineLatest
would be the better choice.
- No need in
Subject
for B
. You can rewrite code like this (no debounceTime
!)
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B_Observable = A_Observable.map(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
return newB;
});
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
Alright, so I tested with your code. Tweak to change a few things. You can see the code here: https://jsbin.com/zonedirogi/edit?html,js,console,output. The main change is actually instead of B with latest from A, change to A with latest from B instead. Then it work as expected.
FYI, you don't need to use asObservable()
to convert subject to observable.
var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);
a.subscribe(x => {
b.next(x * 10);
});
a.withLatestFrom(b).subscribe(x => {
console.log(x)
})
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
a.next(2);
}, 1000);