What is the correct way to share the result of an

2018-12-31 00:41发布

By using Http, we call a method that does a network call and returns an http observable:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

If we take this observable and add multiple subscribers to it:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

What we want to do, is ensure that this does not cause multiple network requests.

This might seem like an unusual scenario, but its actually quite common: for example if the caller subscribes to the observable to display an error message, and passes it to the template using the async pipe, we already have two subscribers.

What is the correct way of doing that in RxJs 5?

Namely, this seems to work fine:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

But is this the idiomatic way of doing this in RxJs 5, or should we do something else instead?

Note : As per Angular 5 new HttpClient, the .map(res => res.json()) part in all examples is now useless, as JSON result is now assumed by default.

21条回答
人间绝色
2楼-- · 2018-12-31 01:03

Per @Cristian suggestion, this is one way that works well for HTTP observables, that only emit once and then they complete:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}
查看更多
与风俱净
3楼-- · 2018-12-31 01:04

rxjs 5.4.0 has a new shareReplay method.

The author explicitly says "ideal for handling things like caching AJAX results"

rxjs PR #2443 feat(shareReplay): adds shareReplay variant of publishReplay

shareReplay returns an observable that is the source multicasted over a ReplaySubject. That replay subject is recycled on error from the source, but not on completion of the source. This makes shareReplay ideal for handling things like caching AJAX results, as it's retryable. It's repeat behavior, however, differs from share in that it will not repeat the source observable, rather it will repeat the source observable's values.

查看更多
回忆,回不去的记忆
4楼-- · 2018-12-31 01:06

I found a way to store the http get result into sessionStorage and use it for the session, so that it will never call the server again.

I used it to call github API to avoid usage limit.

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}

FYI, sessionStorage limit is 5M(or 4.75M). So, it should not be used like this for large set of data.

------ edit -------------
If you want to have refreshed data with F5, which usesmemory data instead of sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}
查看更多
旧时光的记忆
5楼-- · 2018-12-31 01:07

rxjs 5.3.0

I haven't been happy with .map(myFunction).publishReplay(1).refCount()

With multiple subscribers, .map() executes myFunction twice in some cases (I expect it to only execute once). One fix seems to be publishReplay(1).refCount().take(1)

Another thing you can do, is just not use refCount() and make the Observable hot right away:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

This will start the HTTP request regardless of subscribers. I'm not sure if unsubscribing before the HTTP GET finishes will cancel it or not.

查看更多
几人难应
6楼-- · 2018-12-31 01:08

It's .publishReplay(1).refCount(); or .publishLast().refCount(); since Angular Http observables complete after request.

This simple class caches the result so you can subscribe to .value many times and makes only 1 request. You can also use .reload() to make new request and publish data.

You can use it like:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

and the source:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}
查看更多
路过你的时光
7楼-- · 2018-12-31 01:08

Have you tried running the code you already have?

Because you are constructing the Observable from the promise resulting from getJSON(), the network request is made before anyone subscribes. And the resulting promise is shared by all subscribers.

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
查看更多
登录 后发表回答