Rx - Divide stream into segments (lists) by condit

2019-04-21 23:56发布

I have an RX producer that creates a stream of strings like so (simplified version of the real stream):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

The stream is endless, but ordered. So after the strings that start with A run out, B starts. WhenB runs out, C starts... whenZ run out, we move to AA1 etc. There's an unknown number of A's, B's etc, but it's typically 10-30 instances per letter.

I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3, all B's: B1 B2, all C's: C1 C2 C3 C4 C5 C6 etc. Each block can be either an observable (which I'll turn into a list) or simply a list.

I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:

  • Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.

  • Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.

What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.

4条回答
霸刀☆藐视天下
2楼-- · 2019-04-22 00:35

After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called BufferWhile. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{

    private final Func1<? super T, ? extends U> keyGenerator;

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
        return new Subscriber<T>(s) {

            private ArrayList<T> buffer = null;
            private U currentKey = null;

            @Override
            public void onCompleted() {
                submitAndClearBuffer();
                s.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
                s.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
                    currentKey = keyGenerator.call(t);
                    submitAndClearBuffer();
                    buffer.add(t);
                } else {
                    buffer.add(t);
                    request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
                }
            }

            private void submitAndClearBuffer() {
                if (buffer != null && buffer.size() > 0) {
                    s.onNext(buffer);
                }
                buffer = new ArrayList<>();
            }
        };
    }
}

I can apply this operator on the original observable using lift, and get an observable that emits lists of string.

查看更多
Anthone
3楼-- · 2019-04-22 00:39

Assume we have a source stream of string and a function key that extracts the key for each string, such as the following:

IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());

Then we can use Buffer with a custom closing selector.

var query = source.Publish(o => o.Buffer(() =>
{
    var keys = o.Select(key);
    return Observable
        .CombineLatest(
            keys.Take(1),
            keys.Skip(1),
            (a, b) => a != b)
        .Where(x => x);
}));

Each buffer ends when the first item in the buffer and the current item that we are considering adding to the buffer do not have the same key.

查看更多
啃猪蹄的小仙女
4楼-- · 2019-04-22 00:50

Here is a way I'd solve this:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

This is how it works:

  • I'm using defer because we need a per-subscriber buffer, not a global one
  • I concatenate the buffering with the emission of the last buffer if the source happens to be finite
  • I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.
查看更多
爱情/是我丢掉的垃圾
5楼-- · 2019-04-22 00:53

You can use rxjava-extras .toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);

It does what @akarnokd has done under the covers and is unit tested.

查看更多
登录 后发表回答