My pipeline : Kafka -> Dataflow streaming (Beam v2.3) -> BigQuery
Given that low-latency isn't important in my case, I use FILE_LOADS to reduce the costs, like this :
BigQueryIO.writeTableRows()
.withJsonSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(triggeringFrequency)
.withCustomGcsTempLocation(gcsTempLocation)
.withNumFileShards(numFileShards)
.withoutValidation()
.to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
def apply(element: ValueInSingleWindow[TableRow]): TableDestination = {
...
}
}
This Dataflow step is introducing an always bigger delay in the pipeline, so that it can't keep up with Kafka throughput (less than 50k events/s), even with 40 n1-standard-s4
workers. As shown on the screenshot below, the system lag is very big (close to pipeline up-time) for this step, whereas Kafka system lag is only a few seconds.
If I understand correctly, Dataflow writes the elements into numFileShards in gcsTempLocation and every triggeringFrequency a load job is started to insert them into BigQuery. For instance if I choose a triggeringFrequency of 5 minutes, I can see (with bq ls -a -j
) that all the load jobs need less than 1 minute to be completed. But still the step is introducing more and more delay, resulting in Kafka consuming less and less elements (thanks to bcackpressure). Increasing/decreasing numFileShards and triggeringFrequency doesn't correct the problem.
I don't manually specify any window, I just the default one. Files are not accumulating in gcsTempLocation.
Any idea what's going wrong here?