I'd like to write a simple batch processor class. It has a request queue and waits this queue to become full or some amount of time to be passed and only then talks to a database.
It is very convenient to implement this queue via channel - so that our clients will be suspended while it is full. But how can I find out if the channel becomes full?
Of course I can create a method that sends something to the channel and then performs some checks. The next step is to encapculate it in a class derived from Channel. Still very dirty (and it's unclear how can I handle onSend
/onReceive
). Are there any more elegant solutins? Maybe something out-of-box?
This is nothing out-of-the box, but the corresponding batching logic can be readily implemented using an actor. You don't really need a class for that (but you wrap this code in a class if you wish). You can use the following implementation as a template:
const val MAX_SIZE = 100 // max number of data items in batch
const val MAX_TIME = 1000 // max time (in ms) to wait
val batchActor = actor<Data> {
val batch = mutableListOf<Data>()
var deadline = 0L // deadline for sending this batch to DB
while (true) {
// when deadline is reached or size is exceeded, then force batch to DB
val remainingTime = deadline - System.currentTimeMillis()
if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= MAX_SIZE) {
saveToDB(batch)
batch.clear()
continue
}
// wait until items is received or timeout reached
select<Unit> {
// when received -> add to batch
channel.onReceive {
batch.add(it)
// init deadline on first item added to batch
if (batch.size == 1) deadline = System.currentTimeMillis() + MAX_TIME
}
// when timeout is reached just finish select, note: no timeout when batch is empty
if (batch.isNotEmpty()) onTimeout(remainingTime) {}
}
}
}
Now you just do batchActor.send(data)
whenever you need to send anything to the database and the logic inside the actor takes care about batching and saving the resulting batches to the database.
The Channel Interface declares an isFull property which can be queried to determine if it has reached capacity.
I can't see anyway to have a callback function to automatically invoke a function when the capacity is reached but you could periodically check this isFull property to see if it is at capacity.