I have an RX producer that creates a stream of strings like so (simplified version of the real stream):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
The stream is endless, but ordered. So after the strings that start with A
run out, B
starts. WhenB
runs out, C
starts... whenZ
run out, we move to AA1
etc. There's an unknown number of A
's, B
's etc, but it's typically 10-30 instances per letter.
I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3
, all B's: B1 B2
, all C's: C1 C2 C3 C4 C5 C6
etc. Each block can be either an observable (which I'll turn into a list) or simply a list.
I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:
Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.
Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to window
or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.
What's a correct solution using RX?
I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.
Here is a way I'd solve this:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
This is how it works:
- I'm using defer because we need a per-subscriber buffer, not a global one
- I concatenate the buffering with the emission of the last buffer if the source happens to be finite
- I use concatMap and add to a buffer until the key changes, until then, I emit empty Observables. Once the key changed, I emit the contents of the buffer and start a new one.
You can use rxjava-extras .toListWhile
:
Observable<String> source =
Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
(list, t) -> list.isEmpty()
|| list.get(0).charAt(0) == t.charAt(0)))
.forEach(System.out::println);
It does what @akarnokd has done under the covers and is unit tested.
Assume we have a source
stream of string
and a function key
that extracts the key for each string
, such as the following:
IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());
Then we can use Buffer
with a custom closing selector.
var query = source.Publish(o => o.Buffer(() =>
{
var keys = o.Select(key);
return Observable
.CombineLatest(
keys.Take(1),
keys.Skip(1),
(a, b) => a != b)
.Where(x => x);
}));
Each buffer ends when the first item in the buffer and the current item that we are considering adding to the buffer do not have the same key.
After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called BufferWhile
. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:
public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{
private final Func1<? super T, ? extends U> keyGenerator;
public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
this.keyGenerator = keyGenerator;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
return new Subscriber<T>(s) {
private ArrayList<T> buffer = null;
private U currentKey = null;
@Override
public void onCompleted() {
submitAndClearBuffer();
s.onCompleted();
}
@Override
public void onError(Throwable e) {
submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
s.onError(e);
}
@Override
public void onNext(T t) {
if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
currentKey = keyGenerator.call(t);
submitAndClearBuffer();
buffer.add(t);
} else {
buffer.add(t);
request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
}
}
private void submitAndClearBuffer() {
if (buffer != null && buffer.size() > 0) {
s.onNext(buffer);
}
buffer = new ArrayList<>();
}
};
}
}
I can apply this operator on the original observable using lift
, and get an observable that emits lists of string.