RxJava2 Publish

2020-06-18 03:12发布

问题:

What is the difference between

ObservableTransformer {
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )
}

and

ObservableTransformer {
    it.publish{ shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    }
}

when I run my code using this two, I got the same results. What does publish do here.

回答1:

The difference is that the top transformer will subscribe to the upstream twice for a single subscription from the downstream, duplicating any side effects of the upstream which is usually not wanted:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

will print

Subscribed!
2
3
4
Subscribed!
A
B
C

The side-effect represented here is the printout Subscribed! Depending on the actual work in a real source, that could mean sending an email twice, retrieving the rows of a table twice. With this particular example, you can see that even if the source values are interleaved in their type, the output contains them separately.

In contrast, publish(Function) will establish one subscription to the source per one end subscriber, thus any side-effects at the source only happen once.

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

which prints

Subscribed!
A
2
B
3
C
4

because the source is subscribed once and each item is multicast to the two "arms" of the .ofType().compose().



回答2:

publish operator converts your Observable to Connectable Observable.

Lets see what does Connectable Observable mean: Suppose you want to subscribe an observable multiple time and want to serve same items to each subscriber. You need to use Connectable Observable.

Example:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));

output:

first subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2

In this case, we are quick enough to subscribe before the first item is published, but only on the first subscription. The second subscription subscribes late and misses the first publication.

We could move the invocation of the Connect() method until after all subscriptions have been made. That way, even with the call to Thread.Sleep we will not really subscribe to the underlying until after both subscriptions are made. This would be done as follows:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
observable.Connect();

output:

first subscription : 0 
second subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

So using Completable Observable, we have a way to control when to let Observable emit items.

Example taken from : http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDIT According to 180th slide in this link:

Another nature of publish is that if any observer start observing after 10 seconds of observable started emitting items, observer gets only items those were emitted after 10 seconds(at the time of subscription) not all the items. So in sides, as i could understood that publish is being used for UI events. And it totally makes sense that any observer should only receive those events that has been performed after it has subscribed NOT all the events happened before.

Hope it helps.