Chained redux-observable epic only fires correctly

2019-05-22 13:06发布

I've set up an epic that waits for another epic to complete, much like @jayphelps' answer here: Invoking epics from within other epics

However I've found that it only seems to run once. After that I can see the CART_CONFIG_READY action in the console but the DO_THE_NEXT_THING action is not triggered.

I've tried various combinations of mergeMap and switchMap, with and without take but nothing seems to help.

This is (kind of) what my code looks like.

import { NgRedux } from '@angular-redux/store';
import { Observable } from 'rxjs/Observable';
import { ActionsObservable } from 'redux-observable';

export class CartEpicsService {

checkCart = (action$: ActionsObservable<any>, store: NgRedux<any>) => {

    return action$.ofType('CHECK_CART')
        .switchMap(() => {

            console.log('___LISTENING___');

            return action$.ofType('CART_CONFIG_READY')
                .take(1) // removing this doesn't help
                .mergeMap(() => {

                    console.log('___RECEIVED___');

                    // do stuff here

                    return Observable.of({
                        type: 'DO_THE_NEXT_THING'
                    });

                })
                .startWith({
                    type: 'GET_CART_CONFIG'
                });

        });

}

getCartConfig = (action$: ActionsObservable<any>, store: NgRedux<any>) => {

    return action$.ofType('GET_CART_CONFIG')
        .switchMap(() => {

            const config = store.getState().config;

            // we already have the config
            if (config) {
                return Observable.of({
                    type: 'CART_CONFIG_READY'
                });
            }

            // otherwise load it from the server using out HTTP service
            return this.http.get('/cart/config')
                .switchMap((response) => {
                    return Observable.concat(
                        Observable.of({
                            type: 'CART_CONFIG_SUCCESS'
                        }),
                        Observable.of({
                            type: 'CART_CONFIG_READY'
                        })
                    );
                })
                .catch(error => Observable.of({
                    type: 'CART_CONFIG_ERROR',
                    error
                }));


        });

    }

}

For context I need the response from the /cart/config endpoint to check the validity of the cart. I only need to download the config once.

Here is a runnable example on JS Bin:

https://jsbin.com/vovejibuwi/1/edit?js,console

1条回答
Evening l夕情丶
2楼-- · 2019-05-22 13:43

Dang this is definitely a tricky one!

Cause

When state.config === true you return an Observable of CART_CONFIG_READY that emits synchronously, whereas during the first time the http request (or delay, in the jsbin) means it is always going to be async.

Why this makes a difference is in the checkCart epic you return an observable chain that listens for CART_CONFIG_READY with action$.ofType('CART_CONFIG_READY') but also applies a .startWith({ type: 'GET_CART_CONFIG' }). That means that GET_CART_CONFIG is going to be emitted synconously before action$.ofType('CART_CONFIG_READY') is subscribed because startWith is basically shorthand for a concat, which might would make the issue more clear if you're familiar with it. It's nearly exactly the same as doing this:

Observable.concat(
  Observable.of({
    type: 'GET_CART_CONFIG'
  }),
  action$.ofType('CART_CONFIG_READY') // not subscribed until prior complete()s
    .take(1)
    .mergeMap(() => {
      // stuff
    })
);

So to summarize, what is happening the second time around GET_CART_CONFIG is dispatched synchronously, getCartConfig receives it and sees the config is already in the store so it synchronously dispatches CART_CONFIG_READY. But we are not yet listening for it in checkCart so it goes unanswered. Then that callstack returns and the next Observable in the concat, our action$.ofType('CART_CONFIG_READY') chain, gets subscribed to. But too late, the action it listens for has already been emitted!

Solutions

One way to fix this is to make either the emitting of CART_CONFIG_READY always async, or to start listening for it in the other epic before we dispatch GET_CART_CONFIG.

1. emit CART_CONFIG_READY async

Observable.of accepts a scheduler as its last argument, and RxJS supports several of them.

In this case you could use the AsyncScheduler (macrotask) or the AsapScheduler (microtask). Both will work in this case, but they schedule on different times in the JavaScript event loop. If you're not familiar with event loop tasks, check this out.

I would personally recommend using the AsyncSheduler in this case because it will provide the closest async behavior to making an http request.

import { async } from 'rxjs/scheduler/async';

// later inside your epic...

return Observable.of({
  type: 'CART_CONFIG_READY'
}, async);

2. Listen for CART_CONFIG_READY before emitting GET_CART_CONFIG

Because startWith is shorthand for a concat (which we don't want to do) we instead need to use some form of merge, with our ofType chain first so that we listen before emitting.

action$.ofType('CART_CONFIG_READY')
  .take(1)
  .mergeMap(() => {
    // stuff
  })
  .merge(
    Observable.of({ type: 'GET_CART_CONFIG' })
  )

// or

Observable.merge(
  action$.ofType('CART_CONFIG_READY')
    .take(1)
    .mergeMap(() => {
      // stuff
    }),
  Observable.of({ type: 'GET_CART_CONFIG' })
)

// both are exactly the same, pick personal preference on appearance

You only need to do one of these solutions, but it wouldn't hurt to do both of them. Offhand I would probably recommend using both just so that things are consistent and expected, even if they are a bit more verbose.


You might also be happy to know that Observable.of accepts any number of items, which will be emitted in order. So you don't need to use concat:

// before

Observable.concat(
  Observable.of({
    type: 'CART_CONFIG_SUCCESS'
  }),
  Observable.of({
    type: 'CART_CONFIG_READY'
  })
)

// after

Observable.of({
  type: 'CART_CONFIG_SUCCESS'
}, {
  type: 'CART_CONFIG_READY'
})

Thanks so much for the jsbin btw, it made it much easier to debug.


Edit based on your comment:

Out of curiosity did you figure this out through experience or debugging?

A combination of both. I've dealt with a ton of async/scheduled code and ordering is very commonly the source of issues. I scanned the code, mentally picturing execution, noticed the difference in async vs sync depending on codepath, then I made a quick operator to make it easy for me to confirm the order in which any Observable chain is subscribed to.

Observable.prototype.logOnSubscribe = function (msg) {
  // defer is a pretty useful Observable to learn if you haven't yet
  return Observable.defer(() => {
    console.log(msg);
    return this; // the original source
  });
};

I applied it to several places, but the most important are these two:

action$.ofType('CART_CONFIG_READY')
  .take(1)
  .mergeMap(() => {
    // stuff
  })
  .logOnSubscribe('listening for CART_CONFIG_READY') // <--- here
  .startWith({
    type: 'GET_CART_CONFIG'
  });

  //  and in the other epic...

  if (hasConfig) {
    return Observable.of({
      type: 'CART_CONFIG_READY'
    })
    .logOnSubscribe('emitting CART_CONFIG_READY');  // <--- and here
  }

It confirmed that in the second code path CART_CONFIG_READY was getting emitted before the other epic was listening for it.

查看更多
登录 后发表回答