How to fix Dataflow unable to serialize my DoFn?

2020-02-11 03:42发布

问题:

When I run my Dataflow pipeline, I get the exception below complaining that my DoFn can't be serialized. How do I fix this?

Here's the stack trace:

Caused by: java.lang.IllegalArgumentException: unable to serialize contrail.dataflow.AvroMRTransforms$AvroReducerDoFn@bba0fc2
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java:784)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:951)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
    at contrail.stages.DataflowStage.stageMain(DataflowStage.java:51)
    at contrail.stages.NonMRStage.execute(NonMRStage.java:130)
    at contrail.stages.NonMRStage.run(NonMRStage.java:157)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at contrail.stages.ValidateGraphDataflow.main(ValidateGraphDataflow.java:139)
    ... 6 more
Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47)
    ... 27 more

回答1:

To add to what Jeremy says...

Another common cause of Serializable issues is when you use an anonymous DoFn within a non-static context. Anonymous inner classes have an implicit pointer to the enclosing class, which will cause it to get serialized as well.



回答2:

If you scroll through the stack trace, one of the causes clearly identifies the data that isn't serializable.

Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf

The problem was my DoFn was taking a JobConf instance in the constructor and storing it in an instance variable. I was assuming JobConf was serializable but it turns out it isn't.

To solve this I did the following

  • I marked the JobConf member variable as transient so that it wouldn't be serialized.
  • I created a separate variable of type byte[] to store a serialized version of JobConf
  • In my constructor I serialized JobConf to a byte[] and stored it in an instance variable.
  • I overrode startBundle and deserialized the JobConf from the byte[]

Here's a gist with my DoFn.