I have an Apache Beam pipeline running on Google Dataflow whose job is rather simple:
- It reads individual JSON objects from Pub/Sub
- Parses them
- And sends them via HTTP to some API
This API requires me to send the items in batches of 75. So I built a DoFn
that accumulates events in a list and publish them via this API once they I get 75. This results to be too slow, so I thought instead of executing those HTTP requests in different threads using a thread pool.
The implementation of what I have right now looks like this:
private class WriteFn : DoFn<TheEvent, Void>() {
@Transient var api: TheApi
@Transient var currentBatch: MutableList<TheEvent>
@Transient var executor: ExecutorService
@Setup
fun setup() {
api = buildApi()
executor = Executors.newCachedThreadPool()
}
@StartBundle
fun startBundle() {
currentBatch = mutableListOf()
}
@ProcessElement
fun processElement(processContext: ProcessContext) {
val record = processContext.element()
currentBatch.add(record)
if (currentBatch.size >= 75) {
flush()
}
}
private fun flush() {
val payloadTrack = currentBatch.toList()
executor.submit {
api.sendToApi(payloadTrack)
}
currentBatch.clear()
}
@FinishBundle
fun finishBundle() {
if (currentBatch.isNotEmpty()) {
flush()
}
}
@Teardown
fun teardown() {
executor.shutdown()
executor.awaitTermination(30, TimeUnit.SECONDS)
}
}
This seems to work "fine" in the sense that data is making it to the API. But I don't know if this is the right approach and I have the sense that this is very slow.
The reason I think it's slow is that when load testing (by sending a few million events to Pub/Sub), it takes it up to 8 times more time for the pipeline to forward those messages to the API (which has response times of under 8ms) than for my laptop to feed them into Pub/Sub.
Is there any problem with my implementation? Is this the way I should be doing this?
Also... am I required to wait for all the requests to finish in my @FinishBundle
method (i.e. by getting the futures returned by the executor and waiting on them)?