-->

RxJava 2 equivalent to isUnsubscribed

2020-03-23 11:29发布

问题:

I've been working through the examples in the book Reactive Programming with RxJava, which is targeted at version 1 not 2. An introduction to infinite streams has the following example (and notes there are better ways to deal with the concurrency):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();

However, in RxJava 2, the lambda expression passed to the create() method is of type ObservableEmitter and this doesn't have an isUnsubscribed() method. I've had a look in What's Different in 2.0 and also performed a search of the repository but can't find any such method.

How would this same functionality be achieved in 2.0?

Edited to include solution as given below (n.b. using kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: BigInteger = BigInteger.ZERO
        while (!emitter.isDisposed) {
            emitter.onNext(int)
            int = int.add(BigInteger.ONE)
        }
    }).start()
}

val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }

Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()

回答1:

After you subscribe to Observable, Disposable is returned. You can save it to your local variable and check disposable.isDisposed() to see if it still subscribing or not.