Using rx.js, how do I emit a memoized result from

2019-05-22 22:33发布

问题:

I'm currently teaching myself reactive programming with rxjs, and I've set myself a challenge of creating an observable stream which will always emit the same result to a subscriber no matter what.

I've memoized the creation of an HTTP "GET" stream given a specific URL, and I'm trying to act on that stream every two seconds, with the outcome being that for each tick of the timer, I'll extract a cached/memoized HTTP result from the original stream.

import superagent from 'superagent';
import _ from 'lodash';

// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  req = req.end.bind(req);
  return Rx.Observable.fromNodeCallback(req)();
});

// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');

// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
  .map(() => response$)
  .flatMap($ => $)
  .subscribe(response => {
    console.log('Got the response!');
  });

I was sure that I'd have to stick a call to replay() in there somewhere, but no matter what I do, a fresh HTTP call is initiated every two seconds. How can I structure this so that I can construct an observable from a URL and have it always emit the same HTTP result to any subsequent subscribers?

EDIT
I found a way to get the result I want, but I feel like I am missing something, and should be able to refactor this with a much more streamlined approach:

var httpget = _.memoize(function(url) {
  var subject = new Rx.ReplaySubject();
  try {
    superagent.get(url).end((err, res) => {
      if(err) {
        subject.onError(err);
      }
      else {
        subject.onNext(res);
        subject.onCompleted();
      }
    });
  }
  catch(e) {
    subject.onError(e);
  }
  return subject.asObservable();
});

回答1:

Your first code sample is actually closer to the way to do it

var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  return Rx.Observable.fromNodeCallback(req.end, req)();
});

However, this isn't working because there appears to be a bug in fromNodeCallback. As to work around till this is fixed, I think you are actually looking for the AsyncSubject instead of ReplaySubject. The latter works, but the former is designed for exactly this scenario (and doesn't have the overhead of an array creation + runtime checks for cache expiration if that matters to you).

var httpget = _.memoize(function(url) {

  var subject = new Rx.AsyncSubject();
  var req = superagent.get(url);
  Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
  return subject.asObservable();

});

Finally, though map appreciates that you are thinking of it, you can simplify your timer code by using the flatMap overload that takes an Observable directly:

Rx.Observable.timer(0, 2000)
  .flatMap($response)
  .subscribe(response => {
    console.log('Got the response');
  });


回答2:

Unless I am getting your question wrong, Observable.combineLatest does just that for you, it cache the last emitted value of your observable.

This code sends the request once and then give same cached response every 200 ms:

import reqPromise from 'request-promise';
import {Observable} from 'rx';

let httpGet_ = (url) =>
      Observable
      .combineLatest(
        Observable.interval(200),
        reqPromise(url),
        (count, response) => response
      );

httpGet_('http://google.com/')
  .subscribe(
    x => console.log(x),
    e => console.error(e)
  );