I have a Dataflow job consisting of ReadSource, ParDo, Windowing, Insert (into a date-partitioned table in BigQuery).
It basically:
- Reads text files from a Google Storage bucket using a glob
- Process each line by splitting on delimiter, changing some values before giving each column a name and data type before outputting as a BigQuery table row together with a timestamp based on the data
- Window on a daily window using the timestamp from step 2
- Write to BigQuery, using Window table and "dataset$datepartition" syntax to specify table and partition. Create disposition set to CREATE_IF_NEEDED and write disposition set to WRITE_APPEND.
The first three steps seems to run fine but in most cases the job runs into problem on the last insert step which gives exceptions in the log:
java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method)
at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter.insertAll(BigQueryTableInserter.java:287)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.flushRows(BigQueryIO.java:2446)
at com.google.cloud.dataflow.sdk.io.BigQueryIO$StreamingWriteFn.finishBundle(BigQueryIO.java:2404)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.finishBundle(DoFnRunnerBase.java:158)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:196)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.finishBundle(ForwardingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.finish(ParDoOperation.java:65)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:80)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:287)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:223)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This exception is repeated ten times.
At last I get "workflow failed" as below:
Workflow failed. Causes: S04:Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
GroupByKey/Read+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/GroupByKey/
GroupByWindow+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/Reshuffle/
ExpandIterable+Insert/DataflowPipelineRunner.BatchBigQueryIOWrite/BigQueryIO.StreamWithDeDup/ParDo(StreamingWrite)
failed.
Sometimes the same job with the same input works without problem though which makes this quite hard to debug. So where to start?