Diffing two Observables

2019-08-19 05:35发布

问题:

I'm looking for a best way to Diff two Observables. Filtered values from ObservableA should be emited as soon as ObservableB completes without waiting for ObservableA to complete.

<html>
  <head>
    <title></title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
    <script>

    const observable_a =  Rx.Observable.interval(2000).take(10);//0,1,2,3,4,5,6,7,8,9
    const observable_b = Rx.Observable.interval(1000).map(x=>x+3).take(5);//3,4,5,6,7

someDiffObservable(observable_a,observable_b).subscribe(console.log);//should output 0,1,2,8,9
    
    </script>
  </head>
  <body></body>
</html>

回答1:

Try this:

const a$ = Rx.Observable.interval(2000).take(10).share();
const b$ = Rx.Observable.interval(1000).map(x=>x+3).take(5);

Rx.Observable.combineLatest(
    a$.buffer(
      b$.startWith(null).last().concat(a$)
    ), 
    b$.toArray(),  
    (aItems, bItems) => aItems.filter(a => !bItems.includes(a))
  )
  .concatMap(filteredItems => Rx.Observable.from(filteredItems))
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>



回答2:

Currently i've came up with following function to diff two observables.

Is there a simpler/faster/better way to achieve this?

<html>
  <head>
    <title></title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
    <script>

    const observable_a =  Rx.Observable.interval(2000).take(10);//0,1,2,3,4,5,6,7,8,9
    const observable_b = Rx.Observable.interval(1000).map(x=>x+3).take(5);//3,4,5,6,7

    
    function observableDiff(a,b,filter) {
        if(!filter) {
            filter = (value_to_check,blacklist_array)=>{
                return blacklist_array.indexOf(value_to_check)===-1;
            };
        }
        return Rx.Observable.create(observer=>{
            let a_values = [];
            let b_values = [];
            let a_completed = false;
            let b_completed = false;
            
            a.forEach(a_value=>{
                if(b_completed) {
                    if(filter(a_value,b_values)) {
                        observer.next(a_value);
                    }
                } else {
                    a_values.push(a_value);
                }
            }).then(()=>{
                a_completed = true;
                if(b_completed) {
                    observer.complete();
                }
            });
            
            b.forEach(b_value=>{
                b_values.push(b_value);
            }).then(()=>{
                b_completed = true;
                a_values.forEach(a_value=>{
                    if(filter(a_value,b_values)) {
                        observer.next(a_value);
                    }
                });
                a_values = [];
                if(a_completed) {
                    observer.complete();
                }
            });
        });
    }
    
    observableDiff(observable_a,observable_b).subscribe(console.log);//0,1,2,8,9
    
    </script>
  </head>
  <body></body>
</html>