How to handle async function in redux-observable?

2019-07-26 04:15发布

问题:

I am using RxJS and redux-observable.

I am trying to read file in epic. In my case, I have to do it in epic, because some other epic trigger this epic multiple "unknown" times by expand operator.

But since FileReader is async, the code below does not work.

What is the correct way especially RxJS way to handle this? Thanks

export const uploadAttachmentEpic = (action$, store) =>
  action$
    .ofType(UPLOAD_ATTACHMENT)
    .map(action => {
      const reader = new FileReader();

      reader.onload = () => {
        return {
          ...action,
          payload: {
            ...action.payload,
            base64: reader.result
          }
        }
      };

      reader.readAsDataURL(action.payload.file);
    })
    .mergeMap(action =>
      ajax
        .post( /* use action.payload.base64 */ )
        .map(uploadAttachmentSucceed)
        .catch(uploadAttachmentFailed)
    );

回答1:

Fan's answer (as of this writing) is good but has some caveats to it that are important:

  • It starts reading the file immediately instead of lazily. So just calling readFile(file) starts it even before anyone has subscribed. This is error-prone because it's possible that someone might not synchronously subscribe to it right away and then the reader.onload will miss it. Observables are ideally made completely lazy and repeatable factories.

  • It never calls obs.complete() on the observer, so it's possible the subscription will be a memory leak because it never ends.

  • The methods on the observer are not bound, so reader.onerror = obs.error won't actually work. Instead you need to either e => obs.error(e) or obs.error.bind(obs) See here for reference on why

  • It doesn't abort the reading on unsubscribe.

Here's how I would do it:

function readFile(file){
  // Could use Observable.create (same thing) but I
  // prefer this one because Observable.create is
  // not part of the TC39 proposal
  return new Observable(observer => {
    const reader = new FileReader();
    reader.onload = (e) => {
      observer.next(reader.result);
      // It's important to complete() otherwise this
      // subscription might get leaked because it
      // "never ends"
      observer.complete();
    };
    reader.onerror = e => observer.error(e);
    reader.readAsDataURL(file);

    // unsubscribe handler aka cleanup
    return () => {
      // LOADING state.
      // Calling abort() any other time
      // will throw an exception.
      if (reader.readyState === 1) {
        reader.abort();
      }
    };
  });
}

This pattern can be applied to nearly any API, so it's pretty handy to understand exactly how it works.


I hope Fan doesn't mind the critique! I don't mean to offend, just want to share knowledge.



回答2:

Your file reading process does return an Observable. The async process is not handeled properly. I suggest to create an file reading function which return an observable first. then attach it to flapMap()

  function readFile(file){
    let reader = new FileReader();
      return Observable.create(obs => {
        reader.onload = function (e) {
            obs.next(reader.result);
        };
        reader.onerror = obs.error;
    })
        reader.readAsDataURL(file);
   }

then in your code you can merge it in like ..flatMap(file=>readFile(file))