I am looking an operator to debounce
a series of event, let us say user's click. The input and output should be like this:
interval : -> <- -> <-
in : 1--2--3-------4--5--5--6-7-8--------
out : 1-------------4---------------------
The idea is like underscore's debounce with immediate
option on
http://underscorejs.org/#debounce. The operator can be presented/implemented in any language that supports Reactive Extensions
Edit: Clarify the interval, let say 5 seconds (5 spaces between two arrows) : -> <-
Edit2: A more understandable version: I have a user, he clicks repeatedly a button (1, 2, 3); I want to catch the first click
(1) and ignore the rest. After a while, he is tired and rest for 7 seconds (which is longer than the 5 seconds interval between two arrows) and continue clicking the button again (4, 5, 6, 7, 8) I want to catch the first click
(4) and ignore the rest.
If he clicks after the forth arrow, I want to catch that click, too.
Edit3: Here is an image which can be found at the original article
Edit: Based on the clarifications, RxJava doesn't have an operator for this type of flow but it can be composed from a non-trivial set of other operators:
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class DebounceFirst {
public static void main(String[] args) {
Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println("T=" + v))
.compose(debounceFirst(500, TimeUnit.MILLISECONDS))
.toBlocking()
.subscribe(v -> System.out.println("Debounced: " + v));
}
static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
return f ->
f.publish(g ->
g.take(1)
.concatWith(
g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
.take(1)
.ignoreElements()
)
.repeatWhen(h -> h.takeUntil(g.ignoreElements()))
);
}
}
The behavior you want is not what debounce
operator does in Rx.
This is called throttle
, throttleTime
or throttleWithTimeout
(however, it falls under debounce
category of operators). I don't know what language you use but in RxJS it looks like the following image:
See http://reactivex.io/documentation/operators/debounce.html.
Because debounce()
is inherently asynchronous, you need to bring the result back to the current thread explicitly.
seriesOfUnfortunateEvents
.debounce( 14, TimeUnit.MILLISECONDS )
.observeOn( Schedulers.immediate() )
.subscribe( v -> yourStuff() );
According to documentation there are two debounce operators in RxJS. You might be interested in debounceTime
in particular.
debounceTime
From documentation
Emits a value from the source Observable only after a particular time span has passed without another source emission.
Example:
Rx.Observable
.fromEvent(document.querySelector('button'), 'click')
.debounceTime(200)
.mapTo(() => 'clicked!')
.subscribe(v => console.log(v));
It will emit one clicked! if button was clicked in given timespan (200ms in this example).
debounce
From documentation
Emits a value from the source Observable only after a particular time
span determined by another Observable has passed without another
source emission.