I am trying to write a pipeline which periodically checks a Google Storage bucket for new .gz
files which are actually compressed .csv
files. Then it writes those records to a BigQuery table. The following code was working in batch mode before I added the .watchForNewFiles(...)
and .withMethod(STREAMING_INSERTS)
parts. I am expecting it to run in streaming mode with those changes. However I am getting an exception that I can't find anything related on the web. Here is my code:
public static void main(String[] args) {
DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
//.withValidation()
.as(DataflowDfpOptions.class);
Pipeline pipeline = Pipeline.create(options);
Stopwatch sw = Stopwatch.createStarted();
log.info("DFP data transfer from GS to BQ has started.");
pipeline.apply("ReadFromStorage", TextIO.read()
.from("gs://my-bucket/my-folder/*.gz")
.withCompression(Compression.GZIP)
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.never()
)
)
.apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getTableId())
.withMethod(STREAMING_INSERTS)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead
pipeline.run().waitUntilFinish();
log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}
/**
* Creates a TableRow from a CSV line
*/
private static class TableRowConverterFn extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
//Ignore the header line
//Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
if (split[0].equals("Time")) {
log.info("Skipped header");
return;
}
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = TableSchema.getFields().get(i);
//String is the most common type, putting it in the first if clause for a little bit optimization.
if (col.getType().equals("STRING")) {
row.set(col.getName(), split[i]);
} else if (col.getType().equals("INTEGER")) {
row.set(col.getName(), Long.valueOf(split[i]));
} else if (col.getType().equals("BOOLEAN")) {
row.set(col.getName(), Boolean.valueOf(split[i]));
} else if (col.getType().equals("FLOAT")) {
row.set(col.getName(), Float.valueOf(split[i]));
} else {
//Simply try to write it as a String if
//todo: Consider other BQ data types.
row.set(col.getName(), split[i]);
}
}
c.output(row);
}
}
And the stack trace:
java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.diply.data.App.main(App.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
Here is my command to publish the job on Dataflow:
clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner
I use apache beam version 2.5.0
. Here is the relevant section from my pom.xml
.
<properties>
<beam.version>2.5.0</beam.version>
<bigquery.version>v2-rev374-1.23.0</bigquery.version>
<google-clients.version>1.23.0</google-clients.version>
...
</properties>
Running the code with Dataflow 2.4.0 gives a more explicit error:
java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn
However, this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark you need to add the
--streaming
option in yourDexec.args
to force it into streaming mode.I tested it with the code I supplied in the comments with both your pom and mine and both 1. produce your error without
--streaming
2. run fine with--streaming
You might want to open a github beam issue since this behavior is not documented anywhere offically as far as I know.