I want to create observables that do following:
- buffer all items, while they are paused
- immediately emit items, while they are not paused
- the pause/resume trigger must come from another observable
- it must be save to be used by observables that do not run on the main thread and it must be save change the paused/resumed state from the main thread
I want to use a BehaviorSubject<Boolean>
as trigger and bind this trigger to an activity's onResume
and onPause
event. (Code example appended)
Question
I've setup something, but it is not working as intended. I use it like following:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Currently the problem is, that Variant 1 should work fine, but sometimes, the events are just not emitted - the valve is not emitting, until the valve everything is working (may be a threading problem...)! Solution 2 is much simplier and seems to work, but I'm not sure if it is really better, I don't think so. I'm actually not sure, why solution one is failing sometimes so I'm not sure if solution 2 solves the (currently for me unknown) problem...
Can someone tell me what could be the problem or if the simple solution should work reliably? Or show me a reliable solution?
Code
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser functions
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
Activity
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
I made similar thing for logging events.
Subject collects some events, and one time in 10 seconds pushes them to server.
The main idea is, for example you have class
Event
.You should create queue for events:
It can be
BehaviorSubject
, it doesn't matterThen you should create the logic, which will handle pushing events to the server:
Then you should subscribe to it, and retain subscription, until you don't need it:
Home this helps
You can actually use
.buffer()
operator passing it observable, defining when to stop buffering, sample from book:from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md
You can use
PublishSubject
asObservable
to feed it elements in your custom operator. Every time you need to start buffering, create instance byObservable.defer(() -> createBufferingValve())