I'm playing with Reactive Programming, using RxJS, and stumbled upon something I'm not sure how to solve.
Let's say we implement a vending machine. You insert a coin, select an item, and the machine dispenses an item and returns change. We'll assume that price is always 1 cent, so inserting a quarter (25 cents) should return 24 cents back, and so on.
The "tricky" part is that I'd like to be able to handle cases like user inserting 2 coins and then selecting an item. Or selecting an item without inserting a coin.
It seems natural to implement inserted coins and selected items as streams. We can then introduce some sort of dependency between these 2 actions — merging or zipping or combining latest.
However, I quickly ran into an issue where I'd like coins to be accumulated up until an item is dispensed but not further. AFAIU, this means I can't use sum
or scan
since there's no way to "reset" previous accumulation at some point.
Here's an example diagram:
coins: ---25---5-----10------------|->
acc: ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->
And a corresponding code:
this.getCoinsStream()
.scan(function(sum, current) { return sum + current })
.combineLatest(this.getSelectedItemsStream())
.subscribe(function(cents, item) {
dispenseItem(item);
dispenseChange(cents - 1);
});
25 and 5 cents were inserted and then "foo" item was selected. Accumulating coins and then combining latest would lead to "foo" being combined with "30" (which is correct) and then "bar" with "40" (which is incorrect; should be "bar" and "10").
I looked through all of the methods for grouping and filtering and don't see anything that I can use.
An alternative solution I could use is to accumulate coins separately. But this introduces state outside of a stream and I'd really like to avoid that:
var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
return centsDeposited += cents;
});
this.getSelectedItemsStream().subscribe(function(item) {
dispenseItem(item);
dispenseChange(centsDeposited - 1);
centsDeposited = 0;
});
Moreover, this doesn't allow for making streams dependent on each other, such as to wait for coin to be inserted until selected action can return an item.
Am I missing already existing method? What's the best way to achieve something like this — accumulating values up until the moment when they need to be merged with another stream, but also waiting for at least 1 value in 1st stream before merging it with the one from the 2nd?
You could use your scan/combineLatest approach and then finish the stream with a
first
followed up with arepeat
so that it "starts over" the stream but your Observers would not see it.You can also use Window to group together multiple coin events, and use item selection as the window boundary.
Next we can use zip to acquire the item value.
Notice we instantly try to give out items. So the user does have to insert coins before he decide on an item.
Notice i decided to publish both
selectedStream
anddispenser
for safety reasons, we don't want to cause a race-condition where events fire while we're building up the query and zip becomes unbalanced. That would be a very rare condition, but notice that when our sources had been cold Observables, they pretty much start generating as soon as we subscribe, and we must usePublish
to safeguard ourselves.(Shamelessly stolen paulpdaniels example code).
Generally speaking you have the following set of equations:
The hard part is
coins_in_machine
. You need to switch the source observable based on some emissions from two sources.jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output
[UPDATE]
Explanations:
coins_in_machine
. When it is the theinsert_coins
we want to sum the incoming values, as that sum will represent the new amount of coins in the machine. That means the definition ofinsert_coins
switches from one stream to another under the logic defined before. That logic is what is implemented in theswitchMapLatest
.switchMapLatest
and not notswitchMap
as otherwise thecoins_in_machine
stream would continue to receive emission from former switched streams, i.e. duplicated emission as in the end there are ever only two streams to and from which we switch. If I may, I would say this is a close and switch that we need.switchMapLatest
has to return a stream, so we jump through hoops to make a stream that emits 0 andnever
ends (and does not block the computer, as using therepeat
operator would in that case)inserted_coins
emit the values we want. My first implementation wasinserted_coins.scan(sum,0)
and that never worked. The key and I found that quite tricky, is that when we get to that point in the flow,inserted_coins
already emitted one of the values that is a part of the sum. That value is the one passed as a parameter offlatMapLatest
but it is not in the source anymore, so callingscan
after the fact won-t get it, so it is necessary to get that value from theflatMapLatest
and reconstitute the correct behaviour.