RxJava buffer/window with custom counting criteria

2019-08-06 18:52发布

I have an Observable which is emitting a number of objects and I want to group these objects using the window or buffer operations. However, instead of specifying a count parameter for determining how many objects should be in window I want to be able to use a custom criteria.

For example suppose the observable is emitting instances of a Message class like below.

class Message(
   val int size: Int
)

I would like to buffer or window the message instances based on their size variable not just their counts. For example, to gain windows of messages with a total size of at most 5000.

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

Is there an easy way to do this?

1条回答
Rolldiameter
2楼-- · 2019-08-06 19:36

First I have to confess, that I'm not a RxJava expert. I just found your question challenging and tried to find a solution.

There is a window() function with a parameter boundaryIndicator. You have to create a Publisher/ Flowable that emits an item, if the window size is reached.

In the example I created an object windowManager that is used as the boundaryIndicator. In the onNext callback I invoke the windowManager and give it a chance to open a new window.

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}
查看更多
登录 后发表回答