RxJava - Merged Observable that accepts more Obser

2019-08-25 22:54发布

问题:

I am encountering a need for an Observable implementation that holds one or more Observables and merges them. But here is the kicker: I want to add more Observables to be merged at any time, and I guess it might as well support removing them too.

For it to be truly effective, all Subscribers must receive notifications from new Observables that are added post-subscription. Unless all the merged Observables are cold and call onComplete(), then I guess it is okay to let the subscriptions unsubscribe even if more Observables are added. This is more for merging multiple infinite hot Observables and being able to add more at any time.

MergableObservable<MyEvent> allSources = new MergableObservable<>();

//later in application
Observable<MyEvent> eventSource1 = ...
allSources.add(eventSource1);

//and later again
Observable<MyEvent> eventSource2 = ...
allSources.add(eventSource2 );

//and so on
Observable<MyEvent> eventSource3 = ...
allSources.add(eventSource3);

I know there are merging operators, but I need a mutable structure. Am I missing something that already exists? I'd prefer to not use subjects unless it is absolutely appropriate for this situation.

回答1:

You can't avoid Subjects because you want to manually push new Observable sources and not generate them "naturally".

Subject<Observable<T>, Observable<T>> o = PublishSubject
    .<Observable<T>>create().toSerialized();

ConcurrentHashSet<Observable<T>> live = ...

o.flatMap(v -> v.takeWhile(x -> live.containsKey(v))).subscribe(...);

Observable<T> inner = ...
live.add(inner);
o.onNext(inner);

//...

live.remove(inner);


回答2:

Maybe Subjects are the most reasonable way to handle this. I'm open to other answers if anybody knows a better way. It kind of kills me there is not a built-in way to do this. I imagine it would be a common need.

Kotlin Implementation

class MergingObservable<T> {

    private val subject: SerializedSubject<T, T> = PublishSubject<T>().toSerialized()

    fun toObservable(): Observable<T> = subject

    operator fun plusAssign(observable: Observable<T>) {
        add(observable)
    }

    fun add(observable: Observable<T>): Subscription = observable.subscribe(subject)
}