I have a Dataflow job consisting of ReadSource, ParDo, Windowing, Insert (into a date-partitioned table in BigQuery).
It basically:
- Reads text files from a Google Storage bucket using a glob
- Process each line by splitting on delimiter, changing some values before giving each column a name and data type before outputting as a BigQuery table row together with a timestamp based on the data
- Window on a daily window using the timestamp from step 2
- Write to BigQuery, using Window table and "dataset$datepartition" syntax to specify table and partition. Create disposition set to CREATE_IF_NEEDED and write disposition set to WRITE_APPEND.
The first three steps seems to run fine but in most cases the job runs into problem on the last insert step which gives exceptions in the log:
java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:287)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2446)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2404)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This exception is repeated ten times.
At last I get "workflow failed" as below:
Workflow failed. Causes: S04:Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
GroupByKey/Read+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey/
GroupByWindow+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
ExpandIterable+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/ParDo(StreamingWrite)
failed.
Sometimes the same job with the same input works without problem though which makes this quite hard to debug. So where to start?
This is a known issue with the BigQueryIO streaming write operation in Dataflow SDK for Java 1.7.0. It is fixed in the GitHub HEAD and the fix will be included in the 1.8.0 release of the Dataflow Java SDK.
For more details, see Issue #451 on the DataflowJavaSDK GitHub repository.