RxJava debounce by arbitrary value

2019-07-12 11:42发布

问题:

Still figuring out a proper use of different Rx* operators and stumbled upon the following problem:

I have a collection of models of the following type:

class Model {

    final long timestamp;
    final Object data;

    public Model(long timestamp, Object data) {
        this.timestamp = timestamp;
        this.data = data;
    }
}

This collection is sorted in ascending order (sorted by timestamp).

My goal - is to group them by "sequences". "Sequence" - is sequence of elements where each element is really close to its neighbor:

----A-B-C-----D-E-F---H-I--->

In this case I have 3 "sequences". Position on the axis is defined by Model's timestamp property (not the emission time). Max distance to form a sequence should be configurable.

Or let's take more real example:

List<Model> models = new ArrayList<Model>(10) {{
    add(new Model(0, null));
    add(new Model(5, null));
    add(new Model(10, null));
    add(new Model(100, null));
    add(new Model(108, null));
    add(new Model(111, null));
    add(new Model(115, null));
    add(new Model(200, null));
    add(new Model(201, null));
    add(new Model(202, null));
}};

In this case for max distance 10ms, I would get 3 sequences - (0,5,10) , (100,108,111,115) , (200,201,202)

This logic is really similar to debounce operator. But instead of debouncing by real time, I need to debounce by some custom property.

This is how I would do that if timestamp would represent the emission time:

List<Model> models = new ArrayList<Model>(10) {{
    add(new Model(0, null));
    add(new Model(5, null));
    add(new Model(10, null));
    add(new Model(100, null));
    add(new Model(108, null));
    add(new Model(111, null));
    add(new Model(115, null));
    add(new Model(200, null));
    add(new Model(201, null));
    add(new Model(202, null));
}};

Observable<Model> modelsObservable = Observable.from(models).share();

modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
        .subscribe(group -> {
            //this is one of my groups
        });

It is not necessarily needs to be a debounce - I was also looking at groupBy operator, but I couldn't figure out the proper grouping criteria..

回答1:

I wouldn't fiddle with the schedulers, but leverage Buffer/Window (depending if you need downstream observables or collections) and Scan.

In Rx.Net you can achieve it with:

        var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
            .ToObservable();

        var enrichedModels = models.Scan(
            new { Current = -1, Prev = -1 },
            (acc, cur) => new { Current = cur, Prev = acc.Current })
            .Skip(1).Publish();

        enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
            .Select(seq => seq.Select(em => em.Prev))
            .Subscribe(seq =>
            {
                Console.WriteLine(String.Join(",", seq));
            });

        enrichedModels.Connect();

Results in:

0,5,10
100,108,111,115
200,201

Publish/Connect can probably be skipped if your source observable is hot. rx-java possesses the same operators, but not the anonymous types, I guess they can be replaced either by tuple or a concrete class.



回答2:

A bit unconventional, but you could use the TestScheduler here, schedule the value emission by the data value, then use the debounce-trick with this scheduler and move the virtual time ahead.

TestScheduler s = new TestScheduler();
Scheduler.Worker w = s.createWorker();

PublishSubject<Object> subject = PublishSubject.create();

for (Model m : model) {
    w.schedule(() -> subject.onNext(m.data), 
        m.timestamp, TimeUnit.MILLISECONDS);
}

subject.buffer(subject.debounce(10, TimeUnit.MILLISECONDS, s))
    .subscribe(list -> ...);

s.advanceTimeBy(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);

w.unsubscribe();

(There were attempts at implementing a Virtual-time scheduler in RxJava but the discussion was abandoned and the proposed implementation rejected.)