Can I observe additions to an array with rx.js?

2020-02-25 00:30发布

fromArray Rx wiki on github

coffee> rext = require 'rx'                                                 
coffee> arr = [1..5]                                                 
[ 1, 2, 3, 4, 5 ]                                                    
coffee> obs = rext.Observable.fromArray(arr)                         
{ _subscribe: [Function] }                                           
coffee> obs.subscribe( (x) -> console.log("added value: " + x))      
added value: 1                                                       
added value: 2                                                       
added value: 3                                                       
added value: 4                                                       
added value: 5                                                       
{ isStopped: true,                                                   
  observer:                                                          
   { isStopped: true,                                                
     _onNext: [Function],                                            
     _onError: [Function: defaultError],                             
     _onCompleted: [Function: noop] },                               
  m: { isDisposed: true, current: null } }                           
coffee> arr.push(12)    # expecting "added value: 12"                                              
6                       # instead got new length of array                                              
coffee>          

It really looks like the subscribe function will only fire one time, when it's created. It seems like it's a bit of a misnomer, since I'm really just for-eaching the array instead of observing changes on it. That code is almost exactly the same as what's on the wiki though. So either I'm doing it wrong or the subscribe doesn't work how I expect.

4条回答
淡お忘
2楼-- · 2020-02-25 00:49

Observable.fromArray creates an Observable that immediately fires events for each array items, when you add a Subscriber. So, it won't be "watching" the changes to that array.

If you need a "pushable collection", the Bus class in Bacon.js might be what you're looking for. For RxJs there's my little MessageQueue class that has a similar functionality.

查看更多
一纸荒年 Trace。
3楼-- · 2020-02-25 00:51

In RxJS what you are looking for is called a Subject. You can push data into it and stream it from there.

Example:

var array = [];
var arraySubject = new Rx.Subject();

var pushToArray = function (item) {
  array.push(item);
  arraySubject.next(item);
}

// Subscribe to the subject to react to changes
arraySubject.subscribe((item) => console.log(item));
查看更多
迷人小祖宗
4楼-- · 2020-02-25 01:03

How about this:

var subject = new Rx.Subject();
//scan example building an array over time
var example = subject.scan((acc, curr) => { 
    return acc.concat(curr);
}
,[]);

//log accumulated values
var subscribe = example.subscribe(val => 
    console.log('Accumulated array:', val)
);

//next values into subject
subject.next(['Joe']);
subject.next(['Jim']);
查看更多
女痞
5楼-- · 2020-02-25 01:05

I found Rx.Observable.ofObjectChanges(obj) to work just as I expected.

From the documentation page:

Creates an Observable sequence from changes to an object using Object.observe.

Hope it helps.

查看更多
登录 后发表回答