-->

RxJava: How to express doOnFirst()?

2019-05-11 13:48发布

问题:

I am using RxJava and I have an Observable with multiple items inside. What I would like to do is run function A on the first item, function B on all of them and function C when the Observable is completed:

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C

is there a clever way of expressing this with lambda functions? I have the following solution already, but it looks ugly and I suspect that there is a better way to do this:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());

回答1:

Use Observable.defer to encapsulate per subscription state (being a boolean that indicates if we are on the first record).

Here's a runnable class that demos use:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}

Even though OP was asking about RxJava1, here's the same solution above but for RxJava2:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}


回答2:

There's doOnNext and doOnTerminate (or similar) already built in, so it sounds like all that's missing is performing an action on just the first item. Here's one way. You could publish your stream, then in one block the stream proceeds as normal, while in a separate subscription we listen only for the first event (with first) and perform the action when we receive it. Here's an example:

observable.publish(new Func1<Observable<Item>, Observable<Item>>() {
    @Override
    public Observable<Item> call(Observable<Item> itemObservable) {
        itemObservable.first().subscribe((Item item) -> runA(item));
        return itemObservable;
    }
}).subscribe(/* some actions and subscription as usual ... */);

If this looks too verbose, you could put this in a utility Transformer and preserve some of the builder syntax. e.g.:

public static class Transformers<T> implements Observable.Transformer<T, T> {
    private final Action1<T> action1;

    private Transformers(final Action1<T> action1) {
        this.action1 = action1;
    }
    // cover for generics
    public static <T> Observable.Transformer<T, T> doOnFirst(final Action1<T> action1) {
        return new Transformers<T>(action1);
    }

    @Override
    public Observable<T> call(final Observable<T> observable) {
        return observable.publish(new Func1<Observable<T>, Observable<T>>() {
            @Override
            public Observable<T> call(Observable<T> observable) {
                observable.first().subscribe(action1);
                return observable;
            }
        });
    }
}

Then you can call it like so:

observable
    .compose(Transformers.doOnFirst((Item item) -> runA(item)))
    .subscribe(/* chain and subscribe as usual... */);

Obviously this all looks much nicer with Lambda syntax, the above is my guess how it would look like.



回答3:

I guess I've found an easy solution for this myself:

Observable<Integer> observable = Observable.just(1, 2, 3).share();
observable.take(1).subscribe(this::runA);
observable.subscribe(
    this::runB,
    throwable -> {},
    this::runC);

This works single-threaded and it seems to work multi-threaded too, but I have to admit I'm not too confident about that so far.