Still figuring out a proper use of different Rx* operators and stumbled upon the following problem:
I have a collection of models of the following type:
class Model {
final long timestamp;
final Object data;
public Model(long timestamp, Object data) {
this.timestamp = timestamp;
this.data = data;
}
}
This collection is sorted in ascending order (sorted by timestamp).
My goal - is to group them by "sequences". "Sequence" - is sequence of elements where each element is really close to its neighbor:
----A-B-C-----D-E-F---H-I--->
In this case I have 3 "sequences". Position on the axis is defined by Model's timestamp
property (not the emission time). Max distance to form a sequence should be configurable.
Or let's take more real example:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
In this case for max distance 10ms, I would get 3 sequences - (0,5,10) , (100,108,111,115) , (200,201,202)
This logic is really similar to debounce
operator. But instead of debouncing by real time, I need to debounce by some custom property.
This is how I would do that if timestamp would represent the emission time:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
Observable<Model> modelsObservable = Observable.from(models).share();
modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
.subscribe(group -> {
//this is one of my groups
});
It is not necessarily needs to be a debounce - I was also looking at groupBy
operator, but I couldn't figure out the proper grouping criteria..
A bit unconventional, but you could use the
TestScheduler
here, schedule the value emission by the data value, then use the debounce-trick with this scheduler and move the virtual time ahead.(There were attempts at implementing a Virtual-time scheduler in RxJava but the discussion was abandoned and the proposed implementation rejected.)
I wouldn't fiddle with the schedulers, but leverage Buffer/Window (depending if you need downstream observables or collections) and Scan.
In Rx.Net you can achieve it with:
Results in:
Publish/Connect can probably be skipped if your source observable is hot. rx-java possesses the same operators, but not the anonymous types, I guess they can be replaced either by tuple or a concrete class.