coffee> rext = require 'rx'
coffee> arr = [1..5]
[ 1, 2, 3, 4, 5 ]
coffee> obs = rext.Observable.fromArray(arr)
{ _subscribe: [Function] }
coffee> obs.subscribe( (x) -> console.log("added value: " + x))
added value: 1
added value: 2
added value: 3
added value: 4
added value: 5
{ isStopped: true,
{ isStopped: true,
_onNext: [Function],
_onError: [Function: defaultError],
_onCompleted: [Function: noop] },
m: { isDisposed: true, current: null } }
coffee> arr.push(12) # expecting "added value: 12"
6 # instead got new length of array
It really looks like the subscribe
function will only fire one time, when it's created. It seems like it's a bit of a misnomer, since I'm really just for-eaching the array instead of observing changes on it. That code is almost exactly the same as what's on the wiki though. So either I'm doing it wrong or the subscribe
doesn't work how I expect.
Observable.fromArray creates an Observable that immediately fires events for each array items, when you add a Subscriber. So, it won't be "watching" the changes to that array.
If you need a "pushable collection", the Bus class in Bacon.js might be what you're looking for. For RxJs there's my little MessageQueue class that has a similar functionality.
In RxJS what you are looking for is called a
. You can push data into it and stream it from there.Example:
How about this:
I found Rx.Observable.ofObjectChanges(obj) to work just as I expected.
From the documentation page:
Hope it helps.