How to parallelize HTTP requests within an Apache

2019-08-27 11:31发布

问题:

I have an Apache Beam pipeline running on Google Dataflow whose job is rather simple:

  • It reads individual JSON objects from Pub/Sub
  • Parses them
  • And sends them via HTTP to some API

This API requires me to send the items in batches of 75. So I built a DoFn that accumulates events in a list and publish them via this API once they I get 75. This results to be too slow, so I thought instead of executing those HTTP requests in different threads using a thread pool.

The implementation of what I have right now looks like this:

private class WriteFn : DoFn<TheEvent, Void>() {
  @Transient var api: TheApi

  @Transient var currentBatch: MutableList<TheEvent>

  @Transient var executor: ExecutorService

  @Setup
  fun setup() {
    api = buildApi()
    executor = Executors.newCachedThreadPool()
  }

  @StartBundle
  fun startBundle() {
    currentBatch = mutableListOf()
  }

  @ProcessElement
  fun processElement(processContext: ProcessContext) {
    val record = processContext.element()

    currentBatch.add(record)

    if (currentBatch.size >= 75) {
      flush()
    }
  }

  private fun flush() {
    val payloadTrack = currentBatch.toList()
    executor.submit {
      api.sendToApi(payloadTrack)
    }
    currentBatch.clear()
  }

  @FinishBundle
  fun finishBundle() {
    if (currentBatch.isNotEmpty()) {
      flush()
    }
  }

  @Teardown
  fun teardown() {
    executor.shutdown()
    executor.awaitTermination(30, TimeUnit.SECONDS)
  }
}

This seems to work "fine" in the sense that data is making it to the API. But I don't know if this is the right approach and I have the sense that this is very slow.

The reason I think it's slow is that when load testing (by sending a few million events to Pub/Sub), it takes it up to 8 times more time for the pipeline to forward those messages to the API (which has response times of under 8ms) than for my laptop to feed them into Pub/Sub.

Is there any problem with my implementation? Is this the way I should be doing this?

Also... am I required to wait for all the requests to finish in my @FinishBundle method (i.e. by getting the futures returned by the executor and waiting on them)?

回答1:

You have two interrelated questions here:

  1. Are you doing this right / do you need to change anything?
  2. Do you need to wait in @FinishBundle?

The second answer: yes. But actually you need to flush more thoroughly, as will become clear.

Once your @FinishBundle method succeeds, a Beam runner will assume the bundle has completed successfully. But your @FinishBundle only sends the requests - it does not ensure they have succeeded. So you could lose data that way if the requests subsequently fail. Your @FinishBundle method should actually be blocking and waiting for confirmation of success from the TheApi. Incidentally, all of the above should be idempotent, since after finishing the bundle, an earthquake could strike and cause a retry ;-)

So to answer the first question: should you change anything? Just the above. The practice of batching requests this way can work as long as you are sure the results are committed before the bundle is committed.

You may find that doing so will cause your pipeline to slow down, because @FinishBundle happens more frequently than @Setup. To batch up requests across bundles you need to use the lower-level features of state and timers. I wrote up a contrived version of your use case at https://beam.apache.org/blog/2017/08/28/timely-processing.html. I would be quite interested in how this works for you.

It may simply be that the extremely low latency you are expecting, in the low millisecond range, is not available when there is a durable shuffle in your pipeline.