How to create an observable sequence where part of

2019-07-25 22:39发布

问题:

I have a fairly complex feature I want to implement reactively. Whether this feature is enabled depends on the following

  1. User preference for this feature has been enabled (user can toggle this feature on or off)
  2. Activity is started (Activity#onStart()). If you aren't familiar with Android, this is just means the view is visible to the user.

Here are the following observables I already have.

  1. Observable<Boolean> - a user preference for whether this feature is enabled. true if enabled, false if disabled.
  2. Observable<LifeCycleEvent> - LifeCycleEvent is an enum that relates to Android's activity lifecycle.

  3. Observable<Void> and Observable<Void>, when subscribed to, kick off a process intensive task to enable my feature.. These two observables need to be

    • subscribed to when #2 emits an event that LifeCycleEvent#isStarted() returns true and #1 emits true (user enabled feature)
    • unsubscribed to when #2 emits an event that LifeCycleEvent#isStarted() returns false
    • unsubscribed to when #2 emits an event that LifeCycleEvent#isDestoryed() returns true. I will also need to unsubscribe from #1 and #2 here because that means this class is ready for garbage collection.
    • unsubscribed from when #1 emits false (user did not enabled this feature)

How can I best do this? I have a single class that will take the above mentioned dependencies and a single method called void startObserving() that will kick the observation process off (and be called once for the lifetime of this instance. I think I will need to somehow maintain two subscriptions (one for #1 & #2, and another for #3).

回答1:

Oooh, this is a puzzler, I think:

Observable<Boolean> featureEnabled = ...;
Observable<LivecycleEvent> lifecycle = ...
   .filter(event -> event.isStarted() !=null || event.isDestroyed());
Observable<Void> processA = ...;
Observable<Void> processB = ...;

Observable<Void> processes = Observable.merge(processA, processB);

Observable.combineLatest(featureEnabled, lifecycle, (feature, event) ->
       Pair.of(feature && event.isStarted(), event.isDestroyed()))
   .takeUntil(pair -> pair.getRight())
   .map(Pair::getLeft)
   .debounceUntilChanged()
   .switchMap(flag -> flag ? processes : Observable.empty())
   .subscribe(....)