My subscriber's onNext and onComplete function

2019-01-20 11:27发布

In an Android project that uses RxJava 2, I create a Flowable like this in the onCreate of my initial activity:

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());

The implementation of the FlowableOnSubscribe is:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}

This is the subscriber implementation:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}

And the action implementation is:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}

In my output, I'm expecting to a log statement from onNext, but I don't see one. Instead, this is my entire output:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run

This indicates that onNext never runs, and onComplete doesn't even run either. But MyAction runs successfully.

Here's what happens when I comment out the call to onNext:

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete

In this case onNext of course doesn't run, but at least onComplete runs.

I expected that I would see onComplete run in both cases, and onNext run when I call emitter.onNext. What am I doing wrong here?

2条回答
可以哭但决不认输i
2楼-- · 2019-01-20 11:59

How are you testing it? Is it possible your main thread exits before the Observable has a chance to emit the result because you are using Schedulers.IO for the subscribe thread. Also, your observeOn will not do anything as it is only used for further downstream operators.

查看更多
成全新的幸福
3楼-- · 2019-01-20 12:07

You need to manually issue a request otherwise no data will be emitted when extending Subscriber directly:

@Override
public void onSubscribe(Subscription s) {
    Log.i(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
}

Alternatively, you could extend DisposableSubscriber or ResourceSubscriber.

查看更多
登录 后发表回答