Rxjs - Subscribing to interdependent observables

2019-09-04 17:45发布

问题:

I am learning angular 2 and rxjs. I have 3 variables, A B C.

  • B depends on the value of A
  • C depends on the value of A and B

I am trying to setup up the observables such that: When A is updated, B and C will be auto updated. When B is updated, C will be auto updated. I tried two setups but they are not satisfactory.

  • First setup: B subscribes observable A; C subscribes observable B withlatestfrom A. The changes in A did cascade down to B then to C but the value from A is not the latest.
  • Second setup: B subscribes observable A; C subscribes combineLatest observable of A and B. This setup works but I get two updates for C, first from B and then from A.

How can I set up my observables / subscription such that when A is updated, C will only get updated once with latest value from A and B?

EDIT - CODES ADDED

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

回答1:

Code behaves this way because by default RxJs runs code synchronously. Chain of events for withLatestFrom case:

  1. You push new A value: A.next(2);
  2. RxjS sees that there are two usages of A:
    1. B-subscription (comes first)
    2. withLatestFrom (comes second)
  3. RxJs calls B-subscription
    1. It pushes new value of B
    2. RxjS sees that there is a usage of B: C-subscription
    3. RxJs calls C-subscription (with old value of A!)
  4. New value of A is pulled in withLatestFrom but C-subscription isn't triggered (because it is triggered only by B updates)

Possible solution is to add debounceTime(0).It will force C-subscription to run asynchronously when the value inside withLatestFrom is already updated.

For more info google for RxJs's Scheduler.

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();

A_Observable.subscribe(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  B.next(newB);
});

// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
  console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);

// SATISFACTORY RESULT
setTimeout(function(){
  console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

I want to add two things:

  1. withLatestFrom approach is better in your case because dependency graph looks like C -> B -> A and when A is updated B is updated too. If A and B were independent then combineLatest would be the better choice.
  2. No need in Subject for B. You can rewrite code like this (no debounceTime!)

var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();

var B_Observable = A_Observable.map(function(A_value) {
  var newB = A_value * 10;
  // console.log("B auto updating to " + newB);
  return newB;
});

var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
  console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>



回答2:

Alright, so I tested with your code. Tweak to change a few things. You can see the code here: https://jsbin.com/zonedirogi/edit?html,js,console,output. The main change is actually instead of B with latest from A, change to A with latest from B instead. Then it work as expected.

FYI, you don't need to use asObservable() to convert subject to observable.

var a = new Rx.BehaviorSubject(1);
var b = new Rx.BehaviorSubject(10);

a.subscribe(x => {
  b.next(x * 10);
});

a.withLatestFrom(b).subscribe(x => {
  console.log(x)
})

// UPDATE TO A
setTimeout(function(){
  console.log("UPDATING A");
  a.next(2);
}, 1000);