I have a node.js EventEmitter which raises the following events: error
, message
.
Is there a straight forward way I can create an RxJS Observable from it?
i.e next()
called on message
and error()
called on error
.
I have a node.js EventEmitter which raises the following events: error
, message
.
Is there a straight forward way I can create an RxJS Observable from it?
i.e next()
called on message
and error()
called on error
.
You can create it like this:
const obs$ = Observable.create(observer => {
emitter.on('message', val => observer.next(val));
emitter.on('error', err => observer.error(err));
});
As an alternative, you can do this by constructinng and chaining observables like this, but it's clearly less readable:
const message$ = Observable.fromEvent(emitter, 'message');
const error$ = Observable.fromEvent(emitter, 'error');
const obs$ = Observable.merge(
message$.catch(err => Observable.of(err)),
error$.mergeMap(val => Observable.throw(val)),
);