so as I understood from this question here I understood that timeout
operator errors if an observable does not emit any value in the given window of time... the problem for me is, that this window of time resets after each emit, making it necessary to complete the sequence if you are only interested if the first value(s) emit within the window...
is there a nice way to have a "timeout to first"? Other than .take(1).timeout(1000)
?
In addition to @Maxime's answer, you can use race to create a race between your observable's first value and a timeout. We construct the timeout by combining never
with timeout
.
Thus we end up with a race between your source observable, and an observable that will never emit a value, but will throw an error after time goes by. If your source observable produces its first value, then race
will stop listening to the timeout observable.
const timed = source.race(Observable.never().timeout(1000));
There's no timeout for first only as far as I'm aware and here's a way of doing it:
(Plunkr available here)
const { Observable } = Rx;
// this function can be use with the let operator
// ex: someObs$.let(timeoutFirstOnly(100)).subscribe(...)
// it'll emit an error if the first value arrives later
// than timeout parameter value
const timeoutFirstOnly = timeout => observable$ => {
const obs$ = observable$.share();
const [firstValue$, others$] = obs$.partition((value, index) => index === 0);
return Observable.merge(firstValue$.timeout(timeout).first(), others$);
};
const created = Rx.Observable.create(observer => {
// here, this is working
// try to emit at 150ms for example and it'll throw an error
setTimeout(() => observer.next(42), 150);
});
created
.let(timeoutFirstOnly(100))
.subscribe(
val => console.log(`received ${val}`),
error => console.log(`threw ${error}`)
);
But, I felt there might be another way so I asked on Gitter and @Dorus gave me a better result:
created.publish(src =>
Rx.Observable.merge(
src.take(1).timeout(100),
src.skip(1)
)
);
Explanation here, all credit goes to @Dorus on this one:
publish
has 2 flavours:
.publish() : ConnectableObservable<T>
.publish(selector : (src : Observable<T>) => Observable<R>):
Observable<R>
.publish()
returns a ConnectableObservable
and can be used either by calling refCount
on it or by calling connect
on it yourself.
.publish(selector)
let you use the multicasted source in the selector, and emit the result.
By the way, .publish()
is the same as .multicast(() => new Subject())
and .publish(selector)
is the same as .multicast(() => new Subject(), selector)
.
Also .publish().refCount()
is pretty much the same as .share()
. You are correct to call share
before partition
. That solves the problem partially. I still prefer .publish(selector)
because it's safer: .publish(selector)
will run the selector and subscribe
to all sources before it subscribes to the source observable. With share, the first subscription will activate the source, and anything emitted sync will be missed by the second subscription. This could be as worse as using an total sync source and then the second subscription would rerun the source completely.
One last addition: .publish(selector)
has the same signature as .let(selector)
, but let
does not multicast the source. That's the difference.