Apache Beam IllegalArgumentException on Google Dat

2019-07-22 09:01发布

问题:

I am trying to write a pipeline which periodically checks a Google Storage bucket for new .gz files which are actually compressed .csv files. Then it writes those records to a BigQuery table. The following code was working in batch mode before I added the .watchForNewFiles(...) and .withMethod(STREAMING_INSERTS) parts. I am expecting it to run in streaming mode with those changes. However I am getting an exception that I can't find anything related on the web. Here is my code:

public static void main(String[] args) {       

    DataflowDfpOptions options = PipelineOptionsFactory.fromArgs(args)
            //.withValidation()
            .as(DataflowDfpOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    Stopwatch sw = Stopwatch.createStarted();
    log.info("DFP data transfer from GS to BQ has started.");

    pipeline.apply("ReadFromStorage", TextIO.read()
            .from("gs://my-bucket/my-folder/*.gz")
            .withCompression(Compression.GZIP)
            .watchForNewFiles(
                    // Check for new files every 30 seconds
                    Duration.standardSeconds(30),
                    // Never stop checking for new files
                    Watch.Growth.never()
            )
    )
            .apply("TransformToTableRow", ParDo.of(new TableRowConverterFn()))
            .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                    .to(options.getTableId())
                    .withMethod(STREAMING_INSERTS)
                    .withCreateDisposition(CREATE_NEVER)
                    .withWriteDisposition(WRITE_APPEND)
                    .withSchema(TableSchema)); //todo: use withJsonScheme(String json) method instead

    pipeline.run().waitUntilFinish();

    log.info("DFP data transfer from GS to BQ is finished in {} seconds.", sw.elapsed(TimeUnit.SECONDS));
}

/**
 * Creates a TableRow from a CSV line
 */
private static class TableRowConverterFn extends DoFn<String, TableRow> {

    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        String[] split = c.element().split(",");

        //Ignore the header line
        //Since this is going to be run in parallel, we can't guarantee that the first line passed to this method will be the header
        if (split[0].equals("Time")) {
            log.info("Skipped header");
            return;
        }

        TableRow row = new TableRow();
        for (int i = 0; i < split.length; i++) {
            TableFieldSchema col = TableSchema.getFields().get(i);

            //String is the most common type, putting it in the first if clause for a little bit optimization.
            if (col.getType().equals("STRING")) {
                row.set(col.getName(), split[i]);
            } else if (col.getType().equals("INTEGER")) {
                row.set(col.getName(), Long.valueOf(split[i]));
            } else if (col.getType().equals("BOOLEAN")) {
                row.set(col.getName(), Boolean.valueOf(split[i]));
            } else if (col.getType().equals("FLOAT")) {
                row.set(col.getName(), Float.valueOf(split[i]));
            } else {
                //Simply try to write it as a String if
                //todo: Consider other BQ data types.
                row.set(col.getName(), split[i]);
            }
        }
        c.output(row);
    }
}

And the stack trace:

java.lang.IllegalArgumentException: Not expecting a splittable ParDoSingle: should have been overridden
    at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:167)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:145)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:86)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:165)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:684)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.diply.data.App.main(App.java:66)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

Here is my command to publish the job on Dataflow:

clean compile exec:java -Dexec.mainClass=com.my.project.App "-Dexec.args=--runner=DataflowRunner --tempLocation=gs://my-bucket/tmp --tableId=Temp.TestTable --project=my-project --jobName=dataflow-dfp-streaming" -Pdataflow-runner

I use apache beam version 2.5.0. Here is the relevant section from my pom.xml.

 <properties>
   <beam.version>2.5.0</beam.version>
   <bigquery.version>v2-rev374-1.23.0</bigquery.version>
   <google-clients.version>1.23.0</google-clients.version>
   ...
 </properties>

回答1:

Running the code with Dataflow 2.4.0 gives a more explicit error: java.lang.UnsupportedOperationException: DataflowRunner does not currently support splittable DoFn

However, this answer suggests that this is supported since 2.2.0. This is indeed the case, and following this remark you need to add the --streaming option in your Dexec.args to force it into streaming mode.

I tested it with the code I supplied in the comments with both your pom and mine and both 1. produce your error without --streaming 2. run fine with --streaming

You might want to open a github beam issue since this behavior is not documented anywhere offically as far as I know.