I have an Observable which is emitting a number of objects and I want to group these objects using the window
or buffer
operations. However, instead of specifying a count
parameter for determining how many objects should be in window I want to be able to use a custom criteria.
For example suppose the observable is emitting instances of a Message
class like below.
class Message(
val int size: Int
)
I would like to buffer or window the message instances based on their size
variable not just their counts. For example, to gain windows of messages with a total size of at most 5000.
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)
Is there an easy way to do this?
First I have to confess, that I'm not a RxJava expert.
I just found your question challenging and tried to find a solution.
There is a window()
function with a parameter boundaryIndicator
. You have to create a Publisher
/ Flowable
that emits an item, if the window size is reached.
In the example I created an object windowManager
that is used as the boundaryIndicator
. In the onNext
callback I invoke the windowManager
and give it a chance to open a new window.
val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0
fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}
fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}
val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)
Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}