I have an RX producer that creates a stream of strings like so (simplified version of the real stream):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
The stream is endless, but ordered. So after the strings that start with A
run out, B
starts. WhenB
runs out, C
starts... whenZ
run out, we move to AA1
etc. There's an unknown number of A
's, B
's etc, but it's typically 10-30 instances per letter.
I'm looking for a way to divide this stream into blocks of all A's: A1 A2 A3
, all B's: B1 B2
, all C's: C1 C2 C3 C4 C5 C6
etc. Each block can be either an observable (which I'll turn into a list) or simply a list.
I tried several different approaches using RxJava, all of which failed. Among the things that didn't work are:
Group by: since the stream is endless, the per-letter observable doesn't complete. So when the A's run out and the B's start, A's Observable doesn't complete. So there's an ever increasing number of observables.
Window/Buffer with distinctUntilChanged - I use "distinctUntilChanged" on the original stream to output the first item of each group (the first A, first B etc). Then I use that stream as an input to
window
or a "buffer" operator to be used as a boundary between windows/buffers. That didn't work and all I got was empty lists.
What's a correct solution using RX? I'd prefer a Java solution, but solutions in other implementations of RX that can be easily converted to Java are also very welcome.
After looking at akarnokd's and Dave's answers, I came up with my own solution by implementing a custom Rx Operator called
BufferWhile
. It seems to work just as well as the other solutions (someone please correct me if I'm wrong), but it seems more straight forward:I can apply this operator on the original observable using
lift
, and get an observable that emits lists of string.Assume we have a
source
stream ofstring
and a functionkey
that extracts the key for eachstring
, such as the following:Then we can use
Buffer
with a custom closing selector.Each buffer ends when the first item in the buffer and the current item that we are considering adding to the buffer do not have the same key.
Here is a way I'd solve this:
This is how it works:
You can use rxjava-extras
.toListWhile
:It does what @akarnokd has done under the covers and is unit tested.