Accumulating and resetting values in a stream

2019-06-14 14:22发布

I'm playing with Reactive Programming, using RxJS, and stumbled upon something I'm not sure how to solve.

Let's say we implement a vending machine. You insert a coin, select an item, and the machine dispenses an item and returns change. We'll assume that price is always 1 cent, so inserting a quarter (25 cents) should return 24 cents back, and so on.

The "tricky" part is that I'd like to be able to handle cases like user inserting 2 coins and then selecting an item. Or selecting an item without inserting a coin.

It seems natural to implement inserted coins and selected items as streams. We can then introduce some sort of dependency between these 2 actions — merging or zipping or combining latest.

However, I quickly ran into an issue where I'd like coins to be accumulated up until an item is dispensed but not further. AFAIU, this means I can't use sum or scan since there's no way to "reset" previous accumulation at some point.

Here's an example diagram:

coins: ---25---5-----10------------|->
acc:   ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->

And a corresponding code:

this.getCoinsStream()
  .scan(function(sum, current) { return sum + current })
  .combineLatest(this.getSelectedItemsStream())
  .subscribe(function(cents, item) {
    dispenseItem(item);
    dispenseChange(cents - 1);
  });

25 and 5 cents were inserted and then "foo" item was selected. Accumulating coins and then combining latest would lead to "foo" being combined with "30" (which is correct) and then "bar" with "40" (which is incorrect; should be "bar" and "10").

I looked through all of the methods for grouping and filtering and don't see anything that I can use.

An alternative solution I could use is to accumulate coins separately. But this introduces state outside of a stream and I'd really like to avoid that:

var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
  return centsDeposited += cents;
});

this.getSelectedItemsStream().subscribe(function(item) {
  dispenseItem(item);
  dispenseChange(centsDeposited - 1);
  centsDeposited = 0;
});

Moreover, this doesn't allow for making streams dependent on each other, such as to wait for coin to be inserted until selected action can return an item.

Am I missing already existing method? What's the best way to achieve something like this — accumulating values up until the moment when they need to be merged with another stream, but also waiting for at least 1 value in 1st stream before merging it with the one from the 2nd?

3条回答
Animai°情兽
2楼-- · 2019-06-14 14:48

You could use your scan/combineLatest approach and then finish the stream with a first followed up with a repeat so that it "starts over" the stream but your Observers would not see it.

var coinStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#add5'), 'click').map(5),
    Rx.Observable.fromEvent($('#add10'), 'click').map(10),
    Rx.Observable.fromEvent($('#add25'), 'click').map(25)
    );
var selectedStream = Rx.Observable.merge(
  Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
  Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);

var $selection = $('#selection');
var $change = $('#change');

function dispense(selection) {
 $selection.text('Dispensed: ' + selection); 
 console.log("Dispensing Drink: " + selection);   
}

function dispenseChange(change) {
 $change.text('Dispensed change: ' + change); 
 console.log("Dispensing Change: " + change);   
}


var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
                              .combineLatest(selectedStream, 
                                 function(coins, selection) {
                                   return {coins : coins, selection : selection};
                                 })

                              //Combine latest won't emit until both Observables have a value
                              //so you can safely get the first which will be the point that
                              //both Observables have emitted.
                              .first()

                              //First will complete the stream above so use repeat
                              //to resubscribe to the stream transparently
                              //You could also do this conditionally with while or doWhile
                              .repeat()

                              //If you only will subscribe once, then you won't need this but 
                              //here I am showing how to do it with two subscribers
                              .publish();

    
    //Dole out the change
    dispenser.pluck('coins')
             .map(function(c) { return c - 1;})
             .subscribe(dispenseChange);
    
    //Get the selection for dispensation
    dispenser.pluck('selection').subscribe(dispense);
    
    //Wire it up
    dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>

查看更多
地球回转人心会变
3楼-- · 2019-06-14 14:55

You can also use Window to group together multiple coin events, and use item selection as the window boundary.

Next we can use zip to acquire the item value.

Notice we instantly try to give out items. So the user does have to insert coins before he decide on an item.

Notice i decided to publish both selectedStream and dispenser for safety reasons, we don't want to cause a race-condition where events fire while we're building up the query and zip becomes unbalanced. That would be a very rare condition, but notice that when our sources had been cold Observables, they pretty much start generating as soon as we subscribe, and we must use Publish to safeguard ourselves.

(Shamelessly stolen paulpdaniels example code).

var coinStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#add5'), 'click').map(5),
    Rx.Observable.fromEvent($('#add10'), 'click').map(10),
    Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);

var selectedStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
    Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();

var $selection = $('#selection');
var $change = $('#change');

function dispense(selection) {
    $selection.text('Dispensed: ' + selection); 
    console.log("Dispensing Drink: " + selection);   
}

function dispenseChange(change) {
    $change.text('Dispensed change: ' + change); 
    console.log("Dispensing Change: " + change);   
}

// Build the query.
var dispenser = Rx.Observable.zip(
    coinStream
        .window(selectedStream)
        .flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
    selectedStream,
    (coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();

var dispose = new Rx.CompositeDisposable(
    //Dole out the change
    dispenser
        .pluck('coins')
        .map(function(c) { return c - 1;})
        .subscribe(dispenseChange),

    //Get the selection for dispensation
    dispenser
        .pluck('selection')
        .subscribe(dispense),

    //Wire it up
    dispenser.connect(),
    selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>

查看更多
\"骚年 ilove
4楼-- · 2019-06-14 15:03

Generally speaking you have the following set of equations:

inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item

The hard part is coins_in_machine. You need to switch the source observable based on some emissions from two sources.

function emits ( who ) {
  return function ( x ) { console.log([who, ": "].join(" ") + x);};
}

function sum ( a, b ) {return a + b;}

var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function ( x ) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function ( x ) {return "snickers";});

console.log("running");

var accumulated_coins = inserted_coins.scan(sum);

var coins_in_machine =
    Rx.Observable.merge(
        items.tap(emits("items")).map(function ( x ) {return {value : x, flag : 1};}),
        inserted_coins.tap(emits("coins inserted ")).map(function ( x ) {return {value : x, flag : 0};}))
        .distinctUntilChanged(function(x){return x.flag;})
        .flatMapLatest(function ( x ) {
                   switch (x.flag) {
                     case 1 :
                       return Rx.Observable.just(0);
                     case 0 :
                       return inserted_coins.scan(sum, x.value).startWith(x.value);
                   }
                 }
    ).startWith(0);

coins_in_machine.subscribe(emits("coins in machine"));

jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output

[UPDATE]

Explanations:

  • We merge the insert_coins stream with the items stream while attaching a flag to them to know which one of the two emitted when we receive a value in the merged stream
  • When it is the items stream emitting, we want to put 0 in coins_in_machine. When it is the the insert_coins we want to sum the incoming values, as that sum will represent the new amount of coins in the machine. That means the definition of insert_coins switches from one stream to another under the logic defined before. That logic is what is implemented in the switchMapLatest.
  • I use switchMapLatest and not not switchMap as otherwise the coins_in_machine stream would continue to receive emission from former switched streams, i.e. duplicated emission as in the end there are ever only two streams to and from which we switch. If I may, I would say this is a close and switch that we need.
  • switchMapLatest has to return a stream, so we jump through hoops to make a stream that emits 0 and never ends (and does not block the computer, as using the repeat operator would in that case)
  • we jump through some extra hoops to make the inserted_coins emit the values we want. My first implementation was inserted_coins.scan(sum,0) and that never worked. The key and I found that quite tricky, is that when we get to that point in the flow, inserted_coins already emitted one of the values that is a part of the sum. That value is the one passed as a parameter of flatMapLatest but it is not in the source anymore, so calling scan after the fact won-t get it, so it is necessary to get that value from the flatMapLatest and reconstitute the correct behaviour.
查看更多
登录 后发表回答