-->

Rx Java Android : How to convert this callback blo

2019-01-29 12:51发布

问题:

I'm trying to upload a file via Amazon's S3 Android SDK. I've used RX Java a bit but I'm not sure how to convert this method to a method that returns an Observable because I want to chain the result of this method to another Observable call. It confuses me I suppose because of the fact that this does not return right away and can't return until either OnError or OnState changes. How do I handle these situations in an RX way?

public void uploadFile(TransferObserver transferObserver){

    transferObserver.setTransferListener(new TransferListener() {
        @Override
        public void onStateChanged(int id, TransferState state) {

        }

        @Override
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

        }

        @Override
        public void onError(int id, Exception ex) {

        }
  });

}

If someone could answer with RX Java 2 and lambdas that would be great because I just keep coming up short on this one

回答1:

This is generally the right approach to bridge between the async/callback workd to reactive, but using Observable.create() is now discouraged, as it's requires advanced knowledge in order to make it right.
You should use more recent create method Observable.fromEmitter(), which will look quite the same:

    return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
        @Override
        public void call(Emitter<Integer> emitter) {

            transObs.setTransferListener(new TransferListener() {
                @Override
                public void onStateChanged(int id, TransferState state) {
                    if (state == TransferState.COMPLETED)
                        emitter.onCompleted();
                }

                @Override
                public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                }

                @Override
                public void onError(int id, Exception ex) {
                    emitter.onError(ex);
                }
            });
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    // Deal with unsubscription:
                    // 1. unregister the listener to avoid memory leak
                    // 2. cancel the upload 
                }
            });
        }
    }, Emitter.BackpressureMode.DROP);

What was added here is: dealing with unsusbcription: cancelling the upload, and unregistering to avoid memory leaks, and specifying backpressure strategy.
you can read more here.

additional notes:

  • if you interested with progress you can call onNext() with progress at onProgressChanged() and convert the Observable to Observable<Integer>.
  • if not, you might want to consider using Completable which is Observable with no onNext() emissions but only onCompleted() this can suits your case if your not interested with progress indications.


回答2:

@Yosriz I could not get your code to compile , but you did help me quite a bit so based on your answer here is what I now have:

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
            @Override
            public void call(AsyncEmitter<Integer> emitter) {

                transObs.setTransferListener(new TransferListener() {
                    @Override
                    public void onStateChanged(int id, TransferState state) {
                        if (state == TransferState.COMPLETED)
                            emitter.onCompleted();
                    }

                    @Override
                    public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                    }

                    @Override
                    public void onError(int id, Exception ex) {
                        emitter.onError(ex);
                    }
                });

                emitter.setCancellation(new AsyncEmitter.Cancellable() {
                    @Override
                    public void cancel() throws Exception {

                        transObs.cleanTransferListener();
                    }
                });
            }
        }, AsyncEmitter.BackpressureMode.DROP);