I have an Observable which is emitting a number of objects and I want to group these objects using the window
or buffer
operations. However, instead of specifying a count
parameter for determining how many objects should be in window I want to be able to use a custom criteria.
For example suppose the observable is emitting instances of a Message
class like below.
class Message(
val int size: Int
)
I would like to buffer or window the message instances based on their size
variable not just their counts. For example, to gain windows of messages with a total size of at most 5000.
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
Is there an easy way to do this?
First I have to confess, that I'm not a RxJava expert. I just found your question challenging and tried to find a solution.
There is a
window()
function with a parameterboundaryIndicator
. You have to create aPublisher
/Flowable
that emits an item, if the window size is reached.In the example I created an object
windowManager
that is used as theboundaryIndicator
. In theonNext
callback I invoke thewindowManager
and give it a chance to open a new window.