Rx - Divide stream into segments (lists) by condit

2019-04-22 00:46发布

问题:

I have an RX producer that creates a stream of strings like so (simplified version of the real stream):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

The stream is endless, but ordered. So after the strings that start with A run out, B starts. WhenB runs out, C starts... whenZ run out, we move to AA1 etc. There's an unknown number of A's, B's etc, but it's typically 10-30 instances per letter.

I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3, all B's: B1 B2, all C's: C1 C2 C3 C4 C5 C6 etc. Each block can be either an observable (which I'll turn into a list) or simply a list.

I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:

  • Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.

  • Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.

What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.

回答1:

Here is a way I'd solve this:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

This is how it works:

  • I'm using defer because we need a per-subscriber buffer, not a global one
  • I concatenate the buffering with the emission of the last buffer if the source happens to be finite
  • I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.


回答2:

You can use rxjava-extras .toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);

It does what @akarnokd has done under the covers and is unit tested.



回答3:

Assume we have a source stream of string and a function key that extracts the key for each string, such as the following:

IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());

Then we can use Buffer with a custom closing selector.

var query = source.Publish(o => o.Buffer(() =>
{
    var keys = o.Select(key);
    return Observable
        .CombineLatest(
            keys.Take(1),
            keys.Skip(1),
            (a, b) => a != b)
        .Where(x => x);
}));

Each buffer ends when the first item in the buffer and the current item that we are considering adding to the buffer do not have the same key.



回答4:

After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called BufferWhile. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{

    private final Func1<? super T, ? extends U> keyGenerator;

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
        return new Subscriber<T>(s) {

            private ArrayList<T> buffer = null;
            private U currentKey = null;

            @Override
            public void onCompleted() {
                submitAndClearBuffer();
                s.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
                s.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
                    currentKey = keyGenerator.call(t);
                    submitAndClearBuffer();
                    buffer.add(t);
                } else {
                    buffer.add(t);
                    request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
                }
            }

            private void submitAndClearBuffer() {
                if (buffer != null && buffer.size() > 0) {
                    s.onNext(buffer);
                }
                buffer = new ArrayList<>();
            }
        };
    }
}

I can apply this operator on the original observable using lift, and get an observable that emits lists of string.