When I run my Dataflow pipeline, I get the exception below complaining that my DoFn can't be serialized. How do I fix this?
Here's the stack trace:
Caused by: java.lang.IllegalArgumentException: unable to serialize contrail.dataflow.AvroMRTransforms$AvroReducerDoFn@bba0fc2
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java:784)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:951)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
at contrail.stages.DataflowStage.stageMain(DataflowStage.java:51)
at contrail.stages.NonMRStage.execute(NonMRStage.java:130)
at contrail.stages.NonMRStage.run(NonMRStage.java:157)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at contrail.stages.ValidateGraphDataflow.main(ValidateGraphDataflow.java:139)
... 6 more
Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47)
... 27 more
To add to what Jeremy says...
Another common cause of Serializable issues is when you use an anonymous DoFn within a non-static context. Anonymous inner classes have an implicit pointer to the enclosing class, which will cause it to get serialized as well.
If you scroll through the stack trace, one of the causes clearly identifies the data that isn't serializable.
The problem was my DoFn was taking a JobConf instance in the constructor and storing it in an instance variable. I was assuming JobConf was serializable but it turns out it isn't.
To solve this I did the following
Here's a gist with my DoFn.