Execute a piece of code when Kotlin channel become

2019-07-26 23:36发布

I'd like to write a simple batch processor class. It has a request queue and waits this queue to become full or some amount of time to be passed and only then talks to a database.

It is very convenient to implement this queue via channel - so that our clients will be suspended while it is full. But how can I find out if the channel becomes full?

Of course I can create a method that sends something to the channel and then performs some checks. The next step is to encapculate it in a class derived from Channel. Still very dirty (and it's unclear how can I handle onSend/onReceive). Are there any more elegant solutins? Maybe something out-of-box?

2条回答
兄弟一词,经得起流年.
2楼-- · 2019-07-26 23:46

This is nothing out-of-the box, but the corresponding batching logic can be readily implemented using an actor. You don't really need a class for that (but you wrap this code in a class if you wish). You can use the following implementation as a template:

const val MAX_SIZE = 100 // max number of data items in batch
const val MAX_TIME = 1000 // max time (in ms) to wait

val batchActor = actor<Data> {
    val batch = mutableListOf<Data>()
    var deadline = 0L // deadline for sending this batch to DB
    while (true) {
        // when deadline is reached or size is exceeded, then force batch to DB
        val remainingTime = deadline - System.currentTimeMillis()
        if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= MAX_SIZE) {
            saveToDB(batch)
            batch.clear()
            continue
        }
        // wait until items is received or timeout reached
        select<Unit> {
            // when received -> add to batch
            channel.onReceive {
                batch.add(it)
                // init deadline on first item added to batch
                if (batch.size == 1) deadline = System.currentTimeMillis() + MAX_TIME
            }
            // when timeout is reached just finish select, note: no timeout when batch is empty
            if (batch.isNotEmpty()) onTimeout(remainingTime) {}
        }
    }
}

Now you just do batchActor.send(data) whenever you need to send anything to the database and the logic inside the actor takes care about batching and saving the resulting batches to the database.

查看更多
来,给爷笑一个
3楼-- · 2019-07-27 00:04

The Channel Interface declares an isFull property which can be queried to determine if it has reached capacity.

I can't see anyway to have a callback function to automatically invoke a function when the capacity is reached but you could periodically check this isFull property to see if it is at capacity.

查看更多
登录 后发表回答