I have an Apache Beam pipeline running on Google Dataflow whose job is rather simple:
- It reads individual JSON objects from Pub/Sub
- Parses them
- And sends them via HTTP to some API
This API requires me to send the items in batches of 75. So I built a DoFn
that accumulates events in a list and publish them via this API once they I get 75. This results to be too slow, so I thought instead of executing those HTTP requests in different threads using a thread pool.
The implementation of what I have right now looks like this:
private class WriteFn : DoFn<TheEvent, Void>() {
@Transient var api: TheApi
@Transient var currentBatch: MutableList<TheEvent>
@Transient var executor: ExecutorService
@Setup
fun setup() {
api = buildApi()
executor = Executors.newCachedThreadPool()
}
@StartBundle
fun startBundle() {
currentBatch = mutableListOf()
}
@ProcessElement
fun processElement(processContext: ProcessContext) {
val record = processContext.element()
currentBatch.add(record)
if (currentBatch.size >= 75) {
flush()
}
}
private fun flush() {
val payloadTrack = currentBatch.toList()
executor.submit {
api.sendToApi(payloadTrack)
}
currentBatch.clear()
}
@FinishBundle
fun finishBundle() {
if (currentBatch.isNotEmpty()) {
flush()
}
}
@Teardown
fun teardown() {
executor.shutdown()
executor.awaitTermination(30, TimeUnit.SECONDS)
}
}
This seems to work "fine" in the sense that data is making it to the API. But I don't know if this is the right approach and I have the sense that this is very slow.
The reason I think it's slow is that when load testing (by sending a few million events to Pub/Sub), it takes it up to 8 times more time for the pipeline to forward those messages to the API (which has response times of under 8ms) than for my laptop to feed them into Pub/Sub.
Is there any problem with my implementation? Is this the way I should be doing this?
Also... am I required to wait for all the requests to finish in my @FinishBundle
method (i.e. by getting the futures returned by the executor and waiting on them)?
You have two interrelated questions here:
@FinishBundle
?The second answer: yes. But actually you need to flush more thoroughly, as will become clear.
Once your
@FinishBundle
method succeeds, a Beam runner will assume the bundle has completed successfully. But your@FinishBundle
only sends the requests - it does not ensure they have succeeded. So you could lose data that way if the requests subsequently fail. Your@FinishBundle
method should actually be blocking and waiting for confirmation of success from theTheApi
. Incidentally, all of the above should be idempotent, since after finishing the bundle, an earthquake could strike and cause a retry ;-)So to answer the first question: should you change anything? Just the above. The practice of batching requests this way can work as long as you are sure the results are committed before the bundle is committed.
You may find that doing so will cause your pipeline to slow down, because
@FinishBundle
happens more frequently than@Setup
. To batch up requests across bundles you need to use the lower-level features of state and timers. I wrote up a contrived version of your use case at https://beam.apache.org/blog/2017/08/28/timely-processing.html. I would be quite interested in how this works for you.It may simply be that the extremely low latency you are expecting, in the low millisecond range, is not available when there is a durable shuffle in your pipeline.