I am trying to use AvroCoder to serialise a custom type which is passed around in PCollections in my pipeline. The custom type has a generic field (which currently is a String) When I run the pipeline, I get the AvroTypeException like below probably due to the generic field. Is building and passing the AvroSchema for the object the only way to get around this?
Exception in thread "main" org.apache.avro.AvroTypeException: Unknown type: T
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514)
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593)
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189)
at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:116)
I have also attached my registry code for reference.
pipelineCoderRegistry.registerCoder(GenericTypeClass.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>> componentCoders) {
return AvroCoder.of(GenericTypeClass.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
return Collections.singletonList(((GenericTypeClass<Object>) value).key);
}
});
You’ve done everything right as far as setting up the
CoderFactory
, but Avro’sReflectData
mechanism whichAvroCoder
uses to automatically generate a schema does not work for generic types, as of this writing. This is tracked as issue AVRO-1571. See also this StackOverflow question.In order to allow encoding of
GenericTypeClass<T>
for some particular values ofT
, you are correct that you will have to provide some explicit schema information. There are two ways to proceed:The first approach is to provide an explicit schema on fields of type
T
within yourGenericTypeClass<T>
, like so:The drawback is that it is limited to a finite, static union schema, and requires manually inlining the JSON schema for more complex values of
T
.The second approach is to provide an explicit schema when you build an
AvroCoder
in yourCoderFactory
, and provide this schema toAvroCoder.of(Class, Schema)
.This will mostly revolve around converting a
Coder<T>
into a schema forT
. This should be easy for basic types and manageable for POJOs thatReflectData
supports. It does also provide a hook for ad hoc support of more difficult cases.