how to use worker thread in RxAndroid

2019-08-28 06:52发布

问题:

I am trying to use RxAndroid as shown in the below posted code. firstly, I know that to use .delay() I have to have it work on a worker thread through "Schedulers.io" but Schedulers class does not provide or have ".io" thread.How to use it

lib

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

code:

    Observable observable1 = Observable.just("2");
    Observable observable2 = Observable.just("7");
    Observable observable = Observable.zip(observable1, observable2, (i1, i2) -> {

        return i1 + ", " + i2;

    })
            .delay(10, TimeUnit.SECONDS, .....)//how to use a worker thread here
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Object value) {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

回答1:

Try this way -

Observable.just("long", "longer", "longest")
    .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("item length " + length));

Using .subscribeOn(Schedulers.newThread()) you can perform work on background thread.



回答2:

You can provide Schedulers.io() or Schedulers.computation() or any thread here. Below I modified your code and provided the answer.

Observable observable1 = Observable.just("2");
Observable observable2 = Observable.just("7");
Disposable d = 
  Observable.zip(
        observable1, observable2, (i1, i2) -> {
            return i1 + ", " + i2;
        }
    )
    .delay(10, TimeUnit.SECONDS, Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Object value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

One more thing I would like to mention here. Once an Observable is subscribed, it returns disposable. I have made that changes too in your code.

Hope this answer helps you.